Download this notebook

Extended Forecasting Tutorial#

[1]:
%matplotlib inline
import mxnet as mx
from mxnet import gluon
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import json
import os
from itertools import islice
from pathlib import Path
[2]:
mx.random.seed(0)
np.random.seed(0)

Datasets#

The first requirement to use GluonTS is to have an appropriate dataset. GluonTS offers three different options to practitioners that want to experiment with the various modules:

  • Use an available dataset provided by GluonTS

  • Create an artificial dataset using GluonTS

  • Convert your dataset to a GluonTS friendly format

In general, a dataset should satisfy some minimum format requirements to be compatible with GluonTS. In particular, it should be an iterable collection of data entries (time series), and each entry should have at least a target field, which contains the actual values of the time series, and a start field, which denotes the starting date of the time series. There are many more optional fields that we will go through in this tutorial.

The datasets provided by GluonTS come in the appropriate format and they can be used without any post processing. However, a custom dataset needs to be converted. Fortunately this is an easy task.

Available datasets in GluonTS#

GluonTS comes with a number of available datasets.

[3]:
from gluonts.dataset.repository import get_dataset, dataset_names
from gluonts.dataset.util import to_pandas
[4]:
print(f"Available datasets: {dataset_names}")
Available datasets: ['constant', 'exchange_rate', 'solar-energy', 'electricity', 'traffic', 'exchange_rate_nips', 'electricity_nips', 'traffic_nips', 'solar_nips', 'wiki2000_nips', 'wiki-rolling_nips', 'taxi_30min', 'kaggle_web_traffic_with_missing', 'kaggle_web_traffic_without_missing', 'kaggle_web_traffic_weekly', 'm1_yearly', 'm1_quarterly', 'm1_monthly', 'nn5_daily_with_missing', 'nn5_daily_without_missing', 'nn5_weekly', 'tourism_monthly', 'tourism_quarterly', 'tourism_yearly', 'cif_2016', 'london_smart_meters_without_missing', 'wind_farms_without_missing', 'car_parts_without_missing', 'dominick', 'fred_md', 'pedestrian_counts', 'hospital', 'covid_deaths', 'kdd_cup_2018_without_missing', 'weather', 'm3_monthly', 'm3_quarterly', 'm3_yearly', 'm3_other', 'm4_hourly', 'm4_daily', 'm4_weekly', 'm4_monthly', 'm4_quarterly', 'm4_yearly', 'm5', 'uber_tlc_daily', 'uber_tlc_hourly', 'airpassengers', 'australian_electricity_demand', 'electricity_hourly', 'electricity_weekly', 'rideshare_without_missing', 'saugeenday', 'solar_10_minutes', 'solar_weekly', 'sunspot_without_missing', 'temperature_rain_without_missing', 'vehicle_trips_without_missing']

To download one of the built-in datasets, simply call get_dataset with one of the above names. GluonTS can re-use the saved dataset so that it does not need to be downloaded again the next time around.

[5]:
dataset = get_dataset("m4_hourly")

What is in a dataset?#

In general, the datasets provided by GluonTS are objects that consists of three main members:

  • dataset.train is an iterable collection of data entries used for training. Each entry corresponds to one time series.

  • dataset.test is an iterable collection of data entries used for inference. The test dataset is an extended version of the train dataset that contains a window in the end of each time series that was not seen during training. This window has length equal to the recommended prediction length.

  • dataset.metadata contains metadata of the dataset such as the frequency of the time series, a recommended prediction horizon, associated features, etc.

First, let’s see what the first entry of the train dataset contains. We should expect at least a target and a start field in each entry, and the target of the test entry to have an additional window equal to prediction_length.

[6]:
# get the first time series in the training set
train_entry = next(iter(dataset.train))
train_entry.keys()
[6]:
dict_keys(['target', 'start', 'feat_static_cat', 'item_id'])

We observe that apart from the required fields there is one more feat_static_cat field (we can safely ignore the source field). This shows that the dataset has some features apart from the values of the time series. For now, we will ignore this field too. We will explain it in detail later with all the other optional fields.

We can similarly examine the first entry of the test dataset. We should expect exactly the same fields as in the train dataset.

[7]:
# get the first time series in the test set
test_entry = next(iter(dataset.test))
test_entry.keys()
[7]:
dict_keys(['target', 'start', 'feat_static_cat', 'item_id'])

Moreover, we should expect that the target will have an additional window in the end with length equal to prediction_length. To better understand what this means we can visualize both the train and test time series.

[8]:
test_series = to_pandas(test_entry)
train_series = to_pandas(train_entry)

fig, ax = plt.subplots(2, 1, sharex=True, sharey=True, figsize=(10, 7))

train_series.plot(ax=ax[0])
ax[0].grid(which="both")
ax[0].legend(["train series"], loc="upper left")

test_series.plot(ax=ax[1])
ax[1].axvline(train_series.index[-1], color="r")  # end of train dataset
ax[1].grid(which="both")
ax[1].legend(["test series", "end of train series"], loc="upper left")

plt.show()
../../_images/tutorials_forecasting_extended_tutorial_13_0.png
[9]:
print(
    f"Length of forecasting window in test dataset: {len(test_series) - len(train_series)}"
)
print(f"Recommended prediction horizon: {dataset.metadata.prediction_length}")
print(f"Frequency of the time series: {dataset.metadata.freq}")
Length of forecasting window in test dataset: 48
Recommended prediction horizon: 48
Frequency of the time series: H

Create artificial datasets#

We can easily create a complex artificial time series dataset using the ComplexSeasonalTimeSeries module.

[10]:
from gluonts.dataset.artificial import ComplexSeasonalTimeSeries
from gluonts.dataset.common import ListDataset
[11]:
artificial_dataset = ComplexSeasonalTimeSeries(
    num_series=10,
    prediction_length=21,
    freq_str="H",
    length_low=30,
    length_high=200,
    min_val=-10000,
    max_val=10000,
    is_integer=False,
    proportion_missing_values=0,
    is_noise=True,
    is_scale=True,
    percentage_unique_timestamps=1,
    is_out_of_bounds_date=True,
)

We can access some important metadata of the artificial dataset as follows:

[12]:
print(f"prediction length: {artificial_dataset.metadata.prediction_length}")
print(f"frequency: {artificial_dataset.metadata.freq}")
prediction length: 21
frequency: H

The artificial dataset that we created is a list of dictionaries. Each dictionary corresponds to a time series and it should contain the required fields.

[13]:
print(f"type of train dataset: {type(artificial_dataset.train)}")
print(f"train dataset fields: {artificial_dataset.train[0].keys()}")
print(f"type of test dataset: {type(artificial_dataset.test)}")
print(f"test dataset fields: {artificial_dataset.test[0].keys()}")
type of train dataset: <class 'list'>
train dataset fields: dict_keys(['start', 'target', 'item_id'])
type of test dataset: <class 'list'>
test dataset fields: dict_keys(['start', 'target', 'item_id'])

In order to use the artificially created datasets (list of dictionaries) we need to convert them to ListDataset objects.

[14]:
train_ds = ListDataset(artificial_dataset.train, freq=artificial_dataset.metadata.freq)
[15]:
test_ds = ListDataset(artificial_dataset.test, freq=artificial_dataset.metadata.freq)
[16]:
train_entry = next(iter(train_ds))
train_entry.keys()
[16]:
dict_keys(['start', 'target', 'item_id'])
[17]:
test_entry = next(iter(test_ds))
test_entry.keys()
[17]:
dict_keys(['start', 'target', 'item_id'])
[18]:
test_series = to_pandas(test_entry)
train_series = to_pandas(train_entry)

fig, ax = plt.subplots(2, 1, sharex=True, sharey=True, figsize=(10, 7))

train_series.plot(ax=ax[0])
ax[0].grid(which="both")
ax[0].legend(["train series"], loc="upper left")

test_series.plot(ax=ax[1])
ax[1].axvline(train_series.index[-1], color="r")  # end of train dataset
ax[1].grid(which="both")
ax[1].legend(["test series", "end of train series"], loc="upper left")

plt.show()
../../_images/tutorials_forecasting_extended_tutorial_27_0.png

Use your time series and features#

Now, we will see how we can convert any custom dataset with any associated features to an appropriate format for GluonTS.

As already mentioned a dataset is required to have at least the target and the start fields. However, it may have more. Let’s see what are all the available fields:

[19]:
from gluonts.dataset.field_names import FieldName
[20]:
[
    f"FieldName.{k} = '{v}'"
    for k, v in FieldName.__dict__.items()
    if not k.startswith("_")
]
[20]:
["FieldName.ITEM_ID = 'item_id'",
 "FieldName.INFO = 'info'",
 "FieldName.START = 'start'",
 "FieldName.TARGET = 'target'",
 "FieldName.FEAT_STATIC_CAT = 'feat_static_cat'",
 "FieldName.FEAT_STATIC_REAL = 'feat_static_real'",
 "FieldName.FEAT_DYNAMIC_CAT = 'feat_dynamic_cat'",
 "FieldName.FEAT_DYNAMIC_REAL = 'feat_dynamic_real'",
 "FieldName.PAST_FEAT_DYNAMIC_CAT = 'past_feat_dynamic_cat'",
 "FieldName.PAST_FEAT_DYNAMIC_REAL = 'past_feat_dynamic_real'",
 "FieldName.FEAT_DYNAMIC_REAL_LEGACY = 'dynamic_feat'",
 "FieldName.FEAT_DYNAMIC = 'feat_dynamic'",
 "FieldName.PAST_FEAT_DYNAMIC = 'past_feat_dynamic'",
 "FieldName.FEAT_TIME = 'time_feat'",
 "FieldName.FEAT_CONST = 'feat_dynamic_const'",
 "FieldName.FEAT_AGE = 'feat_dynamic_age'",
 "FieldName.OBSERVED_VALUES = 'observed_values'",
 "FieldName.IS_PAD = 'is_pad'",
 "FieldName.FORECAST_START = 'forecast_start'",
 "FieldName.TARGET_DIM_INDICATOR = 'target_dimension_indicator'"]

The fields are split into three categories: the required ones, the optional ones, and the ones that can be added by the Transformation (explained in a while).

Required:

  • start: start date of the time series

  • target: values of the time series

Optional:

  • feat_static_cat: static (over time) categorical features, list with dimension equal to the number of features

  • feat_static_real: static (over time) real features, list with dimension equal to the number of features

  • feat_dynamic_cat: dynamic (over time) categorical features, array with shape equal to (number of features, target length)

  • feat_dynamic_real: dynamic (over time) real features, array with shape equal to (number of features, target length)

Added by Transformation:

  • time_feat: time related features such as the month or the day

  • feat_dynamic_const: expands a constant value feature along the time axis

  • feat_dynamic_age: age feature, i.e., a feature that its value is small for distant past timestamps and it monotonically increases the more we approach the current timestamp

  • observed_values: indicator for observed values, i.e., a feature that equals to 1 if the value is observed and 0 if the value is missing

  • is_pad: indicator for each time step that shows if it is padded (if the length is not enough)

  • forecast_start: forecast start date

As a simple example, we can create a custom dataset to see how we can use some of these fields. The dataset consists of a target, a real dynamic feature (which in this example we set to be the target value one period earlier), and a static categorical feature that indicates the sinusoid type (different phase) that we used to create the target.

[21]:
def create_dataset(num_series, num_steps, period=24, mu=1, sigma=0.3):
    # create target: noise + pattern
    # noise
    noise = np.random.normal(mu, sigma, size=(num_series, num_steps))

    # pattern - sinusoid with different phase
    sin_minusPi_Pi = np.sin(
        np.tile(np.linspace(-np.pi, np.pi, period), int(num_steps / period))
    )
    sin_Zero_2Pi = np.sin(
        np.tile(np.linspace(0, 2 * np.pi, 24), int(num_steps / period))
    )

    pattern = np.concatenate(
        (
            np.tile(sin_minusPi_Pi.reshape(1, -1), (int(np.ceil(num_series / 2)), 1)),
            np.tile(sin_Zero_2Pi.reshape(1, -1), (int(np.floor(num_series / 2)), 1)),
        ),
        axis=0,
    )

    target = noise + pattern

    # create time features: use target one period earlier, append with zeros
    feat_dynamic_real = np.concatenate(
        (np.zeros((num_series, period)), target[:, :-period]), axis=1
    )

    # create categorical static feats: use the sinusoid type as a categorical feature
    feat_static_cat = np.concatenate(
        (
            np.zeros(int(np.ceil(num_series / 2))),
            np.ones(int(np.floor(num_series / 2))),
        ),
        axis=0,
    )

    return target, feat_dynamic_real, feat_static_cat
[22]:
# define the parameters of the dataset
custom_ds_metadata = {
    "num_series": 100,
    "num_steps": 24 * 7,
    "prediction_length": 24,
    "freq": "1H",
    "start": [pd.Period("01-01-2019", freq="1H") for _ in range(100)],
}
[23]:
data_out = create_dataset(
    custom_ds_metadata["num_series"],
    custom_ds_metadata["num_steps"],
    custom_ds_metadata["prediction_length"],
)

target, feat_dynamic_real, feat_static_cat = data_out

We can easily create the train and test datasets by simply filling in the correct fields. Remember that for the train dataset we need to cut the last window.

[24]:
train_ds = ListDataset(
    [
        {
            FieldName.TARGET: target,
            FieldName.START: start,
            FieldName.FEAT_DYNAMIC_REAL: [fdr],
            FieldName.FEAT_STATIC_CAT: [fsc],
        }
        for (target, start, fdr, fsc) in zip(
            target[:, : -custom_ds_metadata["prediction_length"]],
            custom_ds_metadata["start"],
            feat_dynamic_real[:, : -custom_ds_metadata["prediction_length"]],
            feat_static_cat,
        )
    ],
    freq=custom_ds_metadata["freq"],
)
[25]:
test_ds = ListDataset(
    [
        {
            FieldName.TARGET: target,
            FieldName.START: start,
            FieldName.FEAT_DYNAMIC_REAL: [fdr],
            FieldName.FEAT_STATIC_CAT: [fsc],
        }
        for (target, start, fdr, fsc) in zip(
            target, custom_ds_metadata["start"], feat_dynamic_real, feat_static_cat
        )
    ],
    freq=custom_ds_metadata["freq"],
)

Now, we can examine each entry of the train and test datasets. We should expect that they have the following fields: target, start, feat_dynamic_real and feat_static_cat.

[26]:
train_entry = next(iter(train_ds))
train_entry.keys()
[26]:
dict_keys(['target', 'start', 'feat_dynamic_real', 'feat_static_cat'])
[27]:
test_entry = next(iter(test_ds))
test_entry.keys()
[27]:
dict_keys(['target', 'start', 'feat_dynamic_real', 'feat_static_cat'])
[28]:
test_series = to_pandas(test_entry)
train_series = to_pandas(train_entry)

fig, ax = plt.subplots(2, 1, sharex=True, sharey=True, figsize=(10, 7))

train_series.plot(ax=ax[0])
ax[0].grid(which="both")
ax[0].legend(["train series"], loc="upper left")

test_series.plot(ax=ax[1])
ax[1].axvline(train_series.index[-1], color="r")  # end of train dataset
ax[1].grid(which="both")
ax[1].legend(["test series", "end of train series"], loc="upper left")

plt.show()
../../_images/tutorials_forecasting_extended_tutorial_41_0.png

For the rest of the tutorial we will use the custom dataset

Transformations#

Define a transformation#

The primary use case for a Transformation is for feature processing, e.g., adding a holiday feature and for defining the way the dataset will be split into appropriate windows during training and inference.

In general, it gets an iterable collection of entries of a dataset and transform it to another iterable collection that can possibly contain more fields. The transformation is done by defining a set of “actions” to the raw dataset depending on what is useful to our model. This actions usually create some additional features or transform an existing feature. As an example, in the following we add the following transformations:

  • AddObservedValuesIndicator: Creates the observed_values field in the dataset, i.e., adds a feature that equals to 1 if the value is observed and 0 if the value is missing

  • AddAgeFeature: Creates the feat_dynamic_age field in the dataset, i.e., adds a feature that its value is small for distant past timestamps and it monotonically increases the more we approach the current timestamp

One more transformation that can be used is the InstanceSplitter, which is used to define how the datasets are going to be split in example windows during training, validation, or at prediction time. The InstanceSplitter is configured as follows (skipping the obvious fields):

  • is_pad_field: indicator if the time series is padded (if the length is not enough)

  • train_sampler: defines how the training windows are cut/sampled

  • time_series_fields: contains the time dependent features that need to be split in the same manner as the target

[29]:
from gluonts.transform import (
    AddAgeFeature,
    AddObservedValuesIndicator,
    Chain,
    ExpectedNumInstanceSampler,
    InstanceSplitter,
    SetFieldIfNotPresent,
)
[30]:
def create_transformation(freq, context_length, prediction_length):
    return Chain(
        [
            AddObservedValuesIndicator(
                target_field=FieldName.TARGET,
                output_field=FieldName.OBSERVED_VALUES,
            ),
            AddAgeFeature(
                target_field=FieldName.TARGET,
                output_field=FieldName.FEAT_AGE,
                pred_length=prediction_length,
                log_scale=True,
            ),
            InstanceSplitter(
                target_field=FieldName.TARGET,
                is_pad_field=FieldName.IS_PAD,
                start_field=FieldName.START,
                forecast_start_field=FieldName.FORECAST_START,
                instance_sampler=ExpectedNumInstanceSampler(
                    num_instances=1,
                    min_future=prediction_length,
                ),
                past_length=context_length,
                future_length=prediction_length,
                time_series_fields=[
                    FieldName.FEAT_AGE,
                    FieldName.FEAT_DYNAMIC_REAL,
                    FieldName.OBSERVED_VALUES,
                ],
            ),
        ]
    )

Transform a dataset#

Now, we can create a transformation object by applying the above transformation to the custom dataset we have created.

[31]:
transformation = create_transformation(
    custom_ds_metadata["freq"],
    2 * custom_ds_metadata["prediction_length"],  # can be any appropriate value
    custom_ds_metadata["prediction_length"],
)
[32]:
train_tf = transformation(iter(train_ds), is_train=True)
[33]:
type(train_tf)
[33]:
generator

As expected, the output is another iterable object. We can easily examine what is contained in an entry of the transformed dataset. The InstanceSplitter iterates over the transformed dataset and cuts windows by selecting randomly a time series and a starting point on that time series (this “randomness” is defined by the instance_sampler).

[34]:
train_tf_entry = next(iter(train_tf))
[k for k in train_tf_entry.keys()]
[34]:
['start',
 'feat_static_cat',
 'past_feat_dynamic_age',
 'future_feat_dynamic_age',
 'past_feat_dynamic_real',
 'future_feat_dynamic_real',
 'past_observed_values',
 'future_observed_values',
 'past_target',
 'future_target',
 'past_is_pad',
 'forecast_start']

The transformer has done what we asked. In particular it has added:

  • a field for observed values (observed_values)

  • a field for the age feature (feat_dynamic_age)

  • some extra useful fields (past_is_pad, forecast_start)

It has done one more important thing: it has split the window into past and future and has added the corresponding prefixes to all time dependent fields. This way we can easily use e.g., the past_target field as input and the future_target field to calculate the error of our predictions. Of course, the length of the past is equal to the context_length and of the future equal to the prediction_length.

[35]:
print(f"past target shape: {train_tf_entry['past_target'].shape}")
print(f"future target shape: {train_tf_entry['future_target'].shape}")
print(f"past observed values shape: {train_tf_entry['past_observed_values'].shape}")
print(f"future observed values shape: {train_tf_entry['future_observed_values'].shape}")
print(f"past age feature shape: {train_tf_entry['past_feat_dynamic_age'].shape}")
print(f"future age feature shape: {train_tf_entry['future_feat_dynamic_age'].shape}")
print(train_tf_entry["feat_static_cat"])
past target shape: (48,)
future target shape: (24,)
past observed values shape: (48,)
future observed values shape: (24,)
past age feature shape: (48, 1)
future age feature shape: (24, 1)
[0]

Just for comparison, let’s see again what were the fields in the original dataset before the transformation:

[36]:
[k for k in next(iter(train_ds)).keys()]
[36]:
['target', 'start', 'feat_dynamic_real', 'feat_static_cat']

Now, we can move on and see how the test dataset is split. As we saw, the transformation splits the windows into past and future. However, during inference (is_train=False in the transformation), the splitter always cuts the last window (of length context_length) of the dataset so it can be used to predict the subsequent unknown values of length prediction_length.

So, how is the test dataset split in past and future since we do not know the future target? And what about the time dependent features?

[37]:
test_tf = transformation(iter(test_ds), is_train=False)
[38]:
test_tf_entry = next(iter(test_tf))
[k for k in test_tf_entry.keys()]
[38]:
['start',
 'feat_static_cat',
 'past_feat_dynamic_age',
 'future_feat_dynamic_age',
 'past_feat_dynamic_real',
 'future_feat_dynamic_real',
 'past_observed_values',
 'future_observed_values',
 'past_target',
 'future_target',
 'past_is_pad',
 'forecast_start']
[39]:
print(f"past target shape: {test_tf_entry['past_target'].shape}")
print(f"future target shape: {test_tf_entry['future_target'].shape}")
print(f"past observed values shape: {test_tf_entry['past_observed_values'].shape}")
print(f"future observed values shape: {test_tf_entry['future_observed_values'].shape}")
print(f"past age feature shape: {test_tf_entry['past_feat_dynamic_age'].shape}")
print(f"future age feature shape: {test_tf_entry['future_feat_dynamic_age'].shape}")
print(test_tf_entry["feat_static_cat"])
past target shape: (48,)
future target shape: (24,)
past observed values shape: (48,)
future observed values shape: (24,)
past age feature shape: (48, 1)
future age feature shape: (24, 1)
[0]

The future target is empty but not the features - we always assume that we know the future features!

All the things we did manually here are done by an internal block called DataLoader. It gets as an input the raw dataset (in appropriate format) and the transformation object and it outputs the transformed iterable dataset batch by batch. The only thing that we need to worry about is setting the transformation fields correctly!

Training an existing model#

GluonTS comes with a number of pre-built models. All the user needs to do is configure some hyperparameters. The existing models focus on (but are not limited to) probabilistic forecasting. Probabilistic forecasts are predictions in the form of a probability distribution, rather than simply a single point estimate. Having estimated the future distribution of each time step in the forecasting horizon, we can draw a sample from the distribution at each time step and thus create a “sample path” that can be seen as a possible realization of the future. In practice we draw multiple samples and create multiple sample paths which can be used for visualization, evaluation of the model, to derive statistics, etc.

Configuring an estimator#

We will begin with GluonTS’s pre-built feedforward neural network estimator, a simple but powerful forecasting model. We will use this model to demonstrate the process of training a model, producing forecasts, and evaluating the results.

GluonTS’s built-in feedforward neural network (SimpleFeedForwardEstimator) accepts an input window of length context_length and predicts the distribution of the values of the subsequent prediction_length values. In GluonTS parlance, the feedforward neural network model is an example of Estimator. In GluonTS, Estimator objects represent a forecasting model as well as details such as its coefficients, weights, etc.

In general, each estimator (pre-built or custom) is configured by a number of hyperparameters that can be either common (but not binding) among all estimators (e.g., the prediction_length) or specific for the particular estimator (e.g., number of layers for a neural network or the stride in a CNN).

Finally, each estimator is configured by a Trainer, which defines how the model will be trained i.e., the number of epochs, the learning rate, etc.

[40]:
from gluonts.mx import SimpleFeedForwardEstimator, Trainer
[41]:
estimator = SimpleFeedForwardEstimator(
    num_hidden_dimensions=[10],
    prediction_length=custom_ds_metadata["prediction_length"],
    context_length=2 * custom_ds_metadata["prediction_length"],
    trainer=Trainer(
        ctx="cpu",
        epochs=5,
        learning_rate=1e-3,
        hybridize=False,
        num_batches_per_epoch=100,
    ),
)

Getting a predictor#

After specifying our estimator with all the necessary hyperparameters we can train it using our training dataset dataset.train by invoking the train method of the estimator. The training algorithm returns a fitted model (or a Predictor in GluonTS parlance) that can be used to construct forecasts.

We should emphasize here that a single model, as the one defined above, is trained over all the time series contained in the training dataset train_ds. This results in a global model, suitable for prediction for all the time series in train_ds and possibly for other unseen related time series.

[42]:
predictor = estimator.train(train_ds)
100%|██████████| 100/100 [00:00<00:00, 114.30it/s, epoch=1/5, avg_epoch_loss=1.28]
100%|██████████| 100/100 [00:00<00:00, 118.69it/s, epoch=2/5, avg_epoch_loss=0.735]
100%|██████████| 100/100 [00:00<00:00, 132.83it/s, epoch=3/5, avg_epoch_loss=0.677]
100%|██████████| 100/100 [00:00<00:00, 121.54it/s, epoch=4/5, avg_epoch_loss=0.644]
100%|██████████| 100/100 [00:00<00:00, 116.56it/s, epoch=5/5, avg_epoch_loss=0.605]

Saving/Loading an existing model#

A fitted model, i.e., a Predictor, can be saved and loaded back easily:

[43]:
# save the trained model in tmp/
from pathlib import Path

predictor.serialize(Path("/tmp/"))
WARNING:root:Serializing RepresentableBlockPredictor instances does not save the prediction network structure in a backwards-compatible manner. Be careful not to use this method in production.
[44]:
# loads it back
from gluonts.model.predictor import Predictor

predictor_deserialized = Predictor.deserialize(Path("/tmp/"))

Evaluation#

Getting the forecasts#

With a predictor in hand, we can now predict the last window of the dataset.test and evaluate our model’s performance.

GluonTS comes with the make_evaluation_predictions function that automates the process of prediction and model evaluation. Roughly, this function performs the following steps:

  • Removes the final window of length prediction_length of the dataset.test that we want to predict

  • The estimator uses the remaining data to predict (in the form of sample paths) the “future” window that was just removed

  • The module outputs the forecast sample paths and the dataset.test (as python generator objects)

[45]:
from gluonts.evaluation import make_evaluation_predictions
[46]:
forecast_it, ts_it = make_evaluation_predictions(
    dataset=test_ds,  # test dataset
    predictor=predictor,  # predictor
    num_samples=100,  # number of sample paths we want for evaluation
)

First, we can convert these generators to lists to ease the subsequent computations.

[47]:
forecasts = list(forecast_it)
tss = list(ts_it)

We can examine the first element of these lists (that corresponds to the first time series of the dataset). Let’s start with the list containing the time series, i.e., tss. We expect the first entry of tss to contain the (target of the) first time series of test_ds.

[48]:
# first entry of the time series list
ts_entry = tss[0]
[49]:
# first 5 values of the time series (convert from pandas to numpy)
np.array(ts_entry[:5]).reshape(
    -1,
)
[49]:
array([1.5292157 , 0.85025036, 0.7740374 , 0.941432  , 0.6723822 ],
      dtype=float32)
[50]:
# first entry of test_ds
test_ds_entry = next(iter(test_ds))
[51]:
# first 5 values
test_ds_entry["target"][:5]
[51]:
array([1.5292157 , 0.85025036, 0.7740374 , 0.941432  , 0.6723822 ],
      dtype=float32)

The entries in the forecast list are a bit more complex. They are objects that contain all the sample paths in the form of numpy.ndarray with dimension (num_samples, prediction_length), the start date of the forecast, the frequency of the time series, etc. We can access all these information by simply invoking the corresponding attribute of the forecast object.

[52]:
# first entry of the forecast list
forecast_entry = forecasts[0]
[53]:
print(f"Number of sample paths: {forecast_entry.num_samples}")
print(f"Dimension of samples: {forecast_entry.samples.shape}")
print(f"Start date of the forecast window: {forecast_entry.start_date}")
print(f"Frequency of the time series: {forecast_entry.freq}")
Number of sample paths: 100
Dimension of samples: (100, 24)
Start date of the forecast window: 2019-01-07 00:00
Frequency of the time series: <Hour>

We can also do calculations to summarize the sample paths, such as computing the mean or a quantile for each of the 24 time steps in the forecast window.

[54]:
print(f"Mean of the future window:\n {forecast_entry.mean}")
print(f"0.5-quantile (median) of the future window:\n {forecast_entry.quantile(0.5)}")
Mean of the future window:
 [ 0.90765196  0.5002642   0.58970827  0.39372978  0.09580009  0.05871432
 -0.09855336  0.21287599  0.06411848  0.36389527  0.6184984   0.82686234
  1.1362944   1.4422988   1.5806047   1.885066    1.6728904   1.8457131
  2.0097268   1.7956799   1.6775795   1.587245    1.2241242   1.0475011 ]
0.5-quantile (median) of the future window:
 [ 9.2138731e-01  5.3099197e-01  5.4954773e-01  3.5585621e-01
  1.3284978e-01  5.4421678e-02 -5.3116154e-02  1.7734313e-01
 -1.0389383e-03  3.8981181e-01  6.4071041e-01  8.7102258e-01
  1.1050647e+00  1.2608457e+00  1.5636476e+00  1.9202025e+00
  1.7487336e+00  1.8408449e+00  2.0478199e+00  1.7664530e+00
  1.7128807e+00  1.6053847e+00  1.2772603e+00  1.0452789e+00]

Forecast objects have a plot method that can summarize the forecast paths as the mean, prediction intervals, etc. The prediction intervals are shaded in different colors as a “fan chart”.

[55]:
plt.plot(ts_entry[-150:].to_timestamp())
forecast_entry.plot(show_label=True)
plt.legend()
[55]:
<matplotlib.legend.Legend at 0x7effa46e2b80>
../../_images/tutorials_forecasting_extended_tutorial_84_1.png

Compute metrics#

We can also evaluate the quality of our forecasts numerically. In GluonTS, the Evaluator class can compute aggregate performance metrics, as well as metrics per time series (which can be useful for analyzing performance across heterogeneous time series).

[56]:
from gluonts.evaluation import Evaluator
[57]:
evaluator = Evaluator(quantiles=[0.1, 0.5, 0.9])
agg_metrics, item_metrics = evaluator(tss, forecasts)
Running evaluation: 100it [00:00, 2648.02it/s]

Aggregate metrics aggregate both across time-steps and across time series.

[58]:
print(json.dumps(agg_metrics, indent=4))
{
    "MSE": 0.11030595362186432,
    "abs_error": 631.219958782196,
    "abs_target_sum": 2505.765546798706,
    "abs_target_mean": 1.044068977832794,
    "seasonal_error": 0.3378558193842571,
    "MASE": 0.7856021114063411,
    "MAPE": 2.2423562081654866,
    "sMAPE": 0.5366955331961314,
    "MSIS": 5.561884554254895,
    "num_masked_target_values": 0.0,
    "QuantileLoss[0.1]": 274.70157045163216,
    "Coverage[0.1]": 0.09208333333333334,
    "QuantileLoss[0.5]": 631.2199574247934,
    "Coverage[0.5]": 0.4904166666666667,
    "QuantileLoss[0.9]": 295.76119372472164,
    "Coverage[0.9]": 0.8820833333333331,
    "RMSE": 0.332123401195797,
    "NRMSE": 0.3181048457978281,
    "ND": 0.2519070308028716,
    "wQuantileLoss[0.1]": 0.10962780249037385,
    "wQuantileLoss[0.5]": 0.25190703026115985,
    "wQuantileLoss[0.9]": 0.11803226926101591,
    "mean_absolute_QuantileLoss": 400.5609072003824,
    "mean_wQuantileLoss": 0.15985570067084987,
    "MAE_Coverage": 0.39402777777777764,
    "OWA": NaN
}

Individual metrics are aggregated only across time-steps.

[59]:
item_metrics.head()
[59]:
item_id forecast_start MSE abs_error abs_target_sum abs_target_mean seasonal_error MASE MAPE sMAPE num_masked_target_values ND MSIS QuantileLoss[0.1] Coverage[0.1] QuantileLoss[0.5] Coverage[0.5] QuantileLoss[0.9] Coverage[0.9]
0 None 2019-01-07 00:00 0.123998 7.011340 24.638548 1.026606 0.351642 0.830787 0.627694 0.607183 0.0 0.284568 5.906248 2.286915 0.083333 7.011340 0.541667 3.406847 0.875000
1 None 2019-01-07 00:00 0.109331 5.898170 22.178631 0.924110 0.340241 0.722302 1.439482 0.555920 0.0 0.265939 5.646115 2.598318 0.083333 5.898170 0.666667 3.049512 0.916667
2 None 2019-01-07 00:00 0.150442 6.913772 26.601139 1.108381 0.323560 0.890326 0.419949 0.536978 0.0 0.259905 6.923015 3.400284 0.125000 6.913772 0.500000 3.971697 0.791667
3 None 2019-01-07 00:00 0.124376 6.302595 22.502333 0.937597 0.311026 0.844329 1.099858 0.609671 0.0 0.280086 7.677102 3.067686 0.208333 6.302595 0.583333 3.414130 0.958333
4 None 2019-01-07 00:00 0.073736 4.519842 25.864388 1.077683 0.313119 0.601453 0.920273 0.425719 0.0 0.174752 6.257542 2.332712 0.041667 4.519842 0.500000 2.810995 0.916667
[60]:
item_metrics.plot(x="MSIS", y="MASE", kind="scatter")
plt.grid(which="both")
plt.show()
../../_images/tutorials_forecasting_extended_tutorial_92_0.png

Create your own model#

For creating our own forecast model we need to:

  • Define the training and prediction network

  • Define a new estimator that specifies any data processing and uses the networks

The training and prediction networks can be arbitrarily complex but they should follow some basic rules:

  • Both should have a hybrid_forward method that defines what should happen when the network is called

  • The training network’s hybrid_forward should return a loss based on the prediction and the true values

  • The prediction network’s hybrid_forward should return the predictions

The estimator should also include the following methods:

  • create_transformation, defining all the data pre-processing transformations (like adding features)

  • create_training_data_loader, constructing the data loader that gives batches to be used in training, out of a given dataset

  • create_training_network, returning the training network configured with any necessary hyperparameters

  • create_predictor, creating the prediction network, and returning a Predictor object

If a validation dataset is to be accepted, for some validation metric to be computed, then also the following should be defined:

  • create_validation_data_loader

A Predictor defines the predictor.predict method of a given predictor. This method takes the test dataset, it passes it through the prediction network to take the predictions, and yields the predictions. You can think of the Predictor object as a wrapper of the prediction network that defines its predict method.

In this section, we will start simple by creating a feedforward network that is restricted to point forecasts. Then, we will add complexity to the network by expanding it to probabilistic forecasting, considering features and scaling of the time series, and in the end we will replace it with an RNN.

We need to emphasize that the way the following models are implemented and all the design choices that are made are neither binding nor necessarily optimal. Their sole purpose is to provide guidelines and hints on how to build a model.

Point forecasts with a simple feedforward network#

We can create a simple training network that defines a neural network that takes as input a window of length context_length and predicts the subsequent window of dimension prediction_length (thus, the output dimension of the network is prediction_length). The hybrid_forward method of the training network returns the mean of the L1 loss.

The prediction network is (and should be) identical to the training network (by inheriting the class) and its hybrid_forward method returns the predictions.

[61]:
class MyNetwork(gluon.HybridBlock):
    def __init__(self, prediction_length, num_cells, **kwargs):
        super().__init__(**kwargs)
        self.prediction_length = prediction_length
        self.num_cells = num_cells

        with self.name_scope():
            # Set up a 3 layer neural network that directly predicts the target values
            self.nn = mx.gluon.nn.HybridSequential()
            self.nn.add(mx.gluon.nn.Dense(units=self.num_cells, activation="relu"))
            self.nn.add(mx.gluon.nn.Dense(units=self.num_cells, activation="relu"))
            self.nn.add(
                mx.gluon.nn.Dense(units=self.prediction_length, activation="softrelu")
            )


class MyTrainNetwork(MyNetwork):
    def hybrid_forward(self, F, past_target, future_target):
        prediction = self.nn(past_target)
        # calculate L1 loss with the future_target to learn the median
        return (prediction - future_target).abs().mean(axis=-1)


class MyPredNetwork(MyTrainNetwork):
    # The prediction network only receives past_target and returns predictions
    def hybrid_forward(self, F, past_target):
        prediction = self.nn(past_target)
        return prediction.expand_dims(axis=1)

The estimator class is configured by a few hyperparameters and implements the required methods.

[62]:
from functools import partial
from mxnet.gluon import HybridBlock
from gluonts.core.component import validated
from gluonts.dataset.loader import TrainDataLoader
from gluonts.model.predictor import Predictor
from gluonts.mx import (
    batchify,
    copy_parameters,
    get_hybrid_forward_input_names,
    GluonEstimator,
    RepresentableBlockPredictor,
    Trainer,
)
from gluonts.transform import (
    ExpectedNumInstanceSampler,
    Transformation,
    InstanceSplitter,
    TestSplitSampler,
    SelectFields,
    Chain,
)
[63]:
class MyEstimator(GluonEstimator):
    @validated()
    def __init__(
        self,
        prediction_length: int,
        context_length: int,
        num_cells: int,
        batch_size: int = 32,
        trainer: Trainer = Trainer(),
    ) -> None:
        super().__init__(trainer=trainer, batch_size=batch_size)
        self.prediction_length = prediction_length
        self.context_length = context_length
        self.num_cells = num_cells

    def create_transformation(self):
        return Chain([])

    def create_training_data_loader(self, dataset, **kwargs):
        instance_splitter = InstanceSplitter(
            target_field=FieldName.TARGET,
            is_pad_field=FieldName.IS_PAD,
            start_field=FieldName.START,
            forecast_start_field=FieldName.FORECAST_START,
            instance_sampler=ExpectedNumInstanceSampler(
                num_instances=1, min_future=self.prediction_length
            ),
            past_length=self.context_length,
            future_length=self.prediction_length,
        )
        input_names = get_hybrid_forward_input_names(MyTrainNetwork)
        return TrainDataLoader(
            dataset=dataset,
            transform=instance_splitter + SelectFields(input_names),
            batch_size=self.batch_size,
            stack_fn=partial(batchify, ctx=self.trainer.ctx, dtype=self.dtype),
            **kwargs,
        )

    def create_training_network(self) -> MyTrainNetwork:
        return MyTrainNetwork(
            prediction_length=self.prediction_length, num_cells=self.num_cells
        )

    def create_predictor(
        self, transformation: Transformation, trained_network: HybridBlock
    ) -> Predictor:
        prediction_splitter = InstanceSplitter(
            target_field=FieldName.TARGET,
            is_pad_field=FieldName.IS_PAD,
            start_field=FieldName.START,
            forecast_start_field=FieldName.FORECAST_START,
            instance_sampler=TestSplitSampler(),
            past_length=self.context_length,
            future_length=self.prediction_length,
        )

        prediction_network = MyPredNetwork(
            prediction_length=self.prediction_length, num_cells=self.num_cells
        )

        copy_parameters(trained_network, prediction_network)

        return RepresentableBlockPredictor(
            input_transform=transformation + prediction_splitter,
            prediction_net=prediction_network,
            batch_size=self.batch_size,
            prediction_length=self.prediction_length,
            ctx=self.trainer.ctx,
        )

After defining the training and prediction network, as well as the estimator class, we can follow exactly the same steps as with the existing models, i.e., we can specify our estimator by passing all the required hyperparameters to the estimator class, train the estimator by invoking its train method to create a predictor, and finally use the make_evaluation_predictions function to generate our forecasts.

[64]:
estimator = MyEstimator(
    prediction_length=custom_ds_metadata["prediction_length"],
    context_length=2 * custom_ds_metadata["prediction_length"],
    num_cells=40,
    trainer=Trainer(
        ctx="cpu",
        epochs=5,
        learning_rate=1e-3,
        hybridize=False,
        num_batches_per_epoch=100,
    ),
)

The estimator can be trained using our training dataset train_ds just by invoking its train method. The training returns a predictor that can be used to predict.

[65]:
predictor = estimator.train(train_ds)
100%|██████████| 100/100 [00:00<00:00, 297.58it/s, epoch=1/5, avg_epoch_loss=0.48]
100%|██████████| 100/100 [00:00<00:00, 281.48it/s, epoch=2/5, avg_epoch_loss=0.319]
100%|██████████| 100/100 [00:00<00:00, 294.84it/s, epoch=3/5, avg_epoch_loss=0.302]
100%|██████████| 100/100 [00:00<00:00, 301.82it/s, epoch=4/5, avg_epoch_loss=0.289]
100%|██████████| 100/100 [00:00<00:00, 285.97it/s, epoch=5/5, avg_epoch_loss=0.288]
[66]:
forecast_it, ts_it = make_evaluation_predictions(
    dataset=test_ds,  # test dataset
    predictor=predictor,  # predictor
    num_samples=100,  # number of sample paths we want for evaluation
)
[67]:
forecasts = list(forecast_it)
tss = list(ts_it)
[68]:
plt.plot(tss[0][-150:].to_timestamp())
forecasts[0].plot(show_label=True)
plt.legend()
[68]:
<matplotlib.legend.Legend at 0x7effa4734d60>
../../_images/tutorials_forecasting_extended_tutorial_104_1.png

Observe that we cannot actually see any prediction intervals in the predictions. This is expected since the model that we defined does not do probabilistic forecasting but it just gives point estimates. By requiring 100 sample paths (defined in make_evaluation_predictions) in such a network, we get 100 times the same output.

Probabilistic forecasting#

How does a model learn a distribution?#

Probabilistic forecasting requires that we learn the distribution of the future values of the time series and not the values themselves as in point forecasting. To achieve this, we need to specify the type of distribution that the future values follow. GluonTS comes with a number of different distributions that cover many use cases, such as Gaussian, Student-t and Uniform just to name a few.

In order to learn a distribution we need to learn its parameters. For example, in the simple case where we assume a Gaussian distribution, we need to learn the mean and the variance that fully specify the distribution.

Each distribution that is available in GluonTS is defined by the corresponding Distribution class (e.g., Gaussian). This class defines -among others- the parameters of the distribution, its (log-)likelihood and a sampling method (given the parameters).

However, it is not straightforward how to connect a model with such a distribution and learn its parameters. For this, each distribution comes with a DistributionOutput class (e.g., GaussianOutput). The role of this class is to connect a model with a distribution. Its main usage is to take the output of the model and map it to the parameters of the distribution. You can think of it as an additional projection layer on top of the model. The parameters of this layer are optimized along with the rest of the network.

By including this projection layer, our model effectively learns the parameters of the (chosen) distribution of each time step. Such a model is usually optimized by choosing as a loss function the negative log-likelihood of the chosen distribution. After we optimize our model and learn the parameters we can sample or derive any other useful statistics from the learned distributions.

Feedforward network for probabilistic forecasting#

Let’s see what changes we need to make to the previous model to make it probabilistic:

  • First, we need to change the output of the network. In the point forecast network the output was a vector of length prediction_length that gave directly the point estimates. Now, we need to output a set of features that the DistributionOutput will use to project to the distribution parameters. These features should be different for each time step at the prediction horizon. Therefore we need an overall output of prediction_length * num_features values.

  • The DistributionOutput takes as input a tensor and uses the last dimension as features to be projected to the distribution parameters. Here, we need a distribution object for each time step, i.e., prediction_length distribution objects. Given that the output of the network has prediction_length * num_features values, we can reshape it to (prediction_length, num_features) and get the required distributions, while the last axis of length num_features will be projected to the distribution parameters.

  • We want the prediction network to output many sample paths for each time series. To achieve this we can repeat each time series as many times as the number of sample paths and do a standard forecast for each of them.

Note that in all the tensors that we handle there is an initial dimension that refers to the batch, e.g., the output of the network has dimension (batch_size, prediction_length * num_features).

[69]:
from gluonts.mx import DistributionOutput, GaussianOutput
[70]:
class MyProbNetwork(gluon.HybridBlock):
    def __init__(
        self, prediction_length, distr_output, num_cells, num_sample_paths=100, **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.prediction_length = prediction_length
        self.distr_output = distr_output
        self.num_cells = num_cells
        self.num_sample_paths = num_sample_paths
        self.proj_distr_args = distr_output.get_args_proj()

        with self.name_scope():
            # Set up a 2 layer neural network that its ouput will be projected to the distribution parameters
            self.nn = mx.gluon.nn.HybridSequential()
            self.nn.add(mx.gluon.nn.Dense(units=self.num_cells, activation="relu"))
            self.nn.add(
                mx.gluon.nn.Dense(
                    units=self.prediction_length * self.num_cells, activation="relu"
                )
            )


class MyProbTrainNetwork(MyProbNetwork):
    def hybrid_forward(self, F, past_target, future_target):
        # compute network output
        net_output = self.nn(past_target)

        # (batch, prediction_length * nn_features)  ->  (batch, prediction_length, nn_features)
        net_output = net_output.reshape(0, self.prediction_length, -1)

        # project network output to distribution parameters domain
        distr_args = self.proj_distr_args(net_output)

        # compute distribution
        distr = self.distr_output.distribution(distr_args)

        # negative log-likelihood
        loss = distr.loss(future_target)
        return loss


class MyProbPredNetwork(MyProbTrainNetwork):
    # The prediction network only receives past_target and returns predictions
    def hybrid_forward(self, F, past_target):
        # repeat past target: from (batch_size, past_target_length) to
        # (batch_size * num_sample_paths, past_target_length)
        repeated_past_target = past_target.repeat(repeats=self.num_sample_paths, axis=0)

        # compute network output
        net_output = self.nn(repeated_past_target)

        # (batch * num_sample_paths, prediction_length * nn_features)  ->  (batch * num_sample_paths, prediction_length, nn_features)
        net_output = net_output.reshape(0, self.prediction_length, -1)

        # project network output to distribution parameters domain
        distr_args = self.proj_distr_args(net_output)

        # compute distribution
        distr = self.distr_output.distribution(distr_args)

        # get (batch_size * num_sample_paths, prediction_length) samples
        samples = distr.sample()

        # reshape from (batch_size * num_sample_paths, prediction_length) to
        # (batch_size, num_sample_paths, prediction_length)
        return samples.reshape(
            shape=(-1, self.num_sample_paths, self.prediction_length)
        )

The changes we need to do at the estimator are minor and they mainly reflect the additional distr_output parameter that our networks use.

[71]:
class MyProbEstimator(GluonEstimator):
    @validated()
    def __init__(
        self,
        prediction_length: int,
        context_length: int,
        distr_output: DistributionOutput,
        num_cells: int,
        num_sample_paths: int = 100,
        batch_size: int = 32,
        trainer: Trainer = Trainer(),
    ) -> None:
        super().__init__(trainer=trainer, batch_size=batch_size)
        self.prediction_length = prediction_length
        self.context_length = context_length
        self.distr_output = distr_output
        self.num_cells = num_cells
        self.num_sample_paths = num_sample_paths

    def create_transformation(self):
        return Chain([])

    def create_training_data_loader(self, dataset, **kwargs):
        instance_splitter = InstanceSplitter(
            target_field=FieldName.TARGET,
            is_pad_field=FieldName.IS_PAD,
            start_field=FieldName.START,
            forecast_start_field=FieldName.FORECAST_START,
            instance_sampler=ExpectedNumInstanceSampler(
                num_instances=1, min_future=self.prediction_length
            ),
            past_length=self.context_length,
            future_length=self.prediction_length,
        )
        input_names = get_hybrid_forward_input_names(MyProbTrainNetwork)
        return TrainDataLoader(
            dataset=dataset,
            transform=instance_splitter + SelectFields(input_names),
            batch_size=self.batch_size,
            stack_fn=partial(batchify, ctx=self.trainer.ctx, dtype=self.dtype),
            **kwargs,
        )

    def create_training_network(self) -> MyProbTrainNetwork:
        return MyProbTrainNetwork(
            prediction_length=self.prediction_length,
            distr_output=self.distr_output,
            num_cells=self.num_cells,
            num_sample_paths=self.num_sample_paths,
        )

    def create_predictor(
        self, transformation: Transformation, trained_network: HybridBlock
    ) -> Predictor:
        prediction_splitter = InstanceSplitter(
            target_field=FieldName.TARGET,
            is_pad_field=FieldName.IS_PAD,
            start_field=FieldName.START,
            forecast_start_field=FieldName.FORECAST_START,
            instance_sampler=TestSplitSampler(),
            past_length=self.context_length,
            future_length=self.prediction_length,
        )

        prediction_network = MyProbPredNetwork(
            prediction_length=self.prediction_length,
            distr_output=self.distr_output,
            num_cells=self.num_cells,
            num_sample_paths=self.num_sample_paths,
        )

        copy_parameters(trained_network, prediction_network)

        return RepresentableBlockPredictor(
            input_transform=transformation + prediction_splitter,
            prediction_net=prediction_network,
            batch_size=self.batch_size,
            prediction_length=self.prediction_length,
            ctx=self.trainer.ctx,
        )
[72]:
estimator = MyProbEstimator(
    prediction_length=custom_ds_metadata["prediction_length"],
    context_length=2 * custom_ds_metadata["prediction_length"],
    distr_output=GaussianOutput(),
    num_cells=40,
    trainer=Trainer(
        ctx="cpu",
        epochs=5,
        learning_rate=1e-3,
        hybridize=False,
        num_batches_per_epoch=100,
    ),
)
[73]:
predictor = estimator.train(train_ds)
100%|██████████| 100/100 [00:00<00:00, 207.20it/s, epoch=1/5, avg_epoch_loss=0.793]
100%|██████████| 100/100 [00:00<00:00, 214.08it/s, epoch=2/5, avg_epoch_loss=0.399]
100%|██████████| 100/100 [00:00<00:00, 215.39it/s, epoch=3/5, avg_epoch_loss=0.371]
100%|██████████| 100/100 [00:00<00:00, 193.17it/s, epoch=4/5, avg_epoch_loss=0.351]
100%|██████████| 100/100 [00:00<00:00, 196.49it/s, epoch=5/5, avg_epoch_loss=0.343]
[74]:
forecast_it, ts_it = make_evaluation_predictions(
    dataset=test_ds,  # test dataset
    predictor=predictor,  # predictor
    num_samples=100,  # number of sample paths we want for evaluation
)
[75]:
forecasts = list(forecast_it)
tss = list(ts_it)
[76]:
plt.plot(tss[0][-150:].to_timestamp())
forecasts[0].plot(show_label=True)
plt.legend()
[76]:
<matplotlib.legend.Legend at 0x7effa47204c0>
../../_images/tutorials_forecasting_extended_tutorial_114_1.png

Add features and scaling#

In the previous networks we used only the target and did not leverage any of the features of the dataset. Here we expand the probabilistic network by including the feat_dynamic_real field of the dataset that could enhance the forecasting power of our model. We achieve this by concatenating the target and the features to an enhanced vector that forms the new network input.

All the features that are available in a dataset can be potentially used as inputs to our model. However, for the purposes of this example we will restrict ourselves to using only one feature.

An important issue that a practitioner needs to deal with often is the different orders of magnitude in the values of the time series in a dataset. It is extremely helpful for a model to be trained and forecast values that lie roughly in the same value range. To address this issue, we add a Scaler to out model, that computes the scale of each time series. Then we can scale accordingly the values of the time series or any related features and use these as inputs to the network.

[77]:
from gluonts.mx import MeanScaler, NOPScaler
[78]:
class MyProbNetwork(gluon.HybridBlock):
    def __init__(
        self,
        prediction_length,
        context_length,
        distr_output,
        num_cells,
        num_sample_paths=100,
        scaling=True,
        **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.prediction_length = prediction_length
        self.context_length = context_length
        self.distr_output = distr_output
        self.num_cells = num_cells
        self.num_sample_paths = num_sample_paths
        self.proj_distr_args = distr_output.get_args_proj()
        self.scaling = scaling

        with self.name_scope():
            # Set up a 2 layer neural network that its ouput will be projected to the distribution parameters
            self.nn = mx.gluon.nn.HybridSequential()
            self.nn.add(mx.gluon.nn.Dense(units=self.num_cells, activation="relu"))
            self.nn.add(
                mx.gluon.nn.Dense(
                    units=self.prediction_length * self.num_cells, activation="relu"
                )
            )

            if scaling:
                self.scaler = MeanScaler(keepdims=True)
            else:
                self.scaler = NOPScaler(keepdims=True)

    def compute_scale(self, past_target, past_observed_values):
        # scale shape is (batch_size, 1)
        _, scale = self.scaler(
            past_target.slice_axis(axis=1, begin=-self.context_length, end=None),
            past_observed_values.slice_axis(
                axis=1, begin=-self.context_length, end=None
            ),
        )

        return scale


class MyProbTrainNetwork(MyProbNetwork):
    def hybrid_forward(
        self,
        F,
        past_target,
        future_target,
        past_observed_values,
        past_feat_dynamic_real,
    ):
        # compute scale
        scale = self.compute_scale(past_target, past_observed_values)

        # scale target and time features
        past_target_scale = F.broadcast_div(past_target, scale)
        past_feat_dynamic_real_scale = F.broadcast_div(
            past_feat_dynamic_real.squeeze(axis=-1), scale
        )

        # concatenate target and time features to use them as input to the network
        net_input = F.concat(past_target_scale, past_feat_dynamic_real_scale, dim=-1)

        # compute network output
        net_output = self.nn(net_input)

        # (batch, prediction_length * nn_features)  ->  (batch, prediction_length, nn_features)
        net_output = net_output.reshape(0, self.prediction_length, -1)

        # project network output to distribution parameters domain
        distr_args = self.proj_distr_args(net_output)

        # compute distribution
        distr = self.distr_output.distribution(distr_args, scale=scale)

        # negative log-likelihood
        loss = distr.loss(future_target)
        return loss


class MyProbPredNetwork(MyProbTrainNetwork):
    # The prediction network only receives past_target and returns predictions
    def hybrid_forward(
        self, F, past_target, past_observed_values, past_feat_dynamic_real
    ):
        # repeat fields: from (batch_size, past_target_length) to
        # (batch_size * num_sample_paths, past_target_length)
        repeated_past_target = past_target.repeat(repeats=self.num_sample_paths, axis=0)
        repeated_past_observed_values = past_observed_values.repeat(
            repeats=self.num_sample_paths, axis=0
        )
        repeated_past_feat_dynamic_real = past_feat_dynamic_real.repeat(
            repeats=self.num_sample_paths, axis=0
        )

        # compute scale
        scale = self.compute_scale(repeated_past_target, repeated_past_observed_values)

        # scale repeated target and time features
        repeated_past_target_scale = F.broadcast_div(repeated_past_target, scale)
        repeated_past_feat_dynamic_real_scale = F.broadcast_div(
            repeated_past_feat_dynamic_real.squeeze(axis=-1), scale
        )

        # concatenate target and time features to use them as input to the network
        net_input = F.concat(
            repeated_past_target_scale, repeated_past_feat_dynamic_real_scale, dim=-1
        )

        # compute network oputput
        net_output = self.nn(net_input)

        # (batch * num_sample_paths, prediction_length * nn_features)  ->  (batch * num_sample_paths, prediction_length, nn_features)
        net_output = net_output.reshape(0, self.prediction_length, -1)

        # project network output to distribution parameters domain
        distr_args = self.proj_distr_args(net_output)

        # compute distribution
        distr = self.distr_output.distribution(distr_args, scale=scale)

        # get (batch_size * num_sample_paths, prediction_length) samples
        samples = distr.sample()

        # reshape from (batch_size * num_sample_paths, prediction_length) to
        # (batch_size, num_sample_paths, prediction_length)
        return samples.reshape(
            shape=(-1, self.num_sample_paths, self.prediction_length)
        )
[79]:
class MyProbEstimator(GluonEstimator):
    @validated()
    def __init__(
        self,
        prediction_length: int,
        context_length: int,
        distr_output: DistributionOutput,
        num_cells: int,
        num_sample_paths: int = 100,
        scaling: bool = True,
        batch_size: int = 32,
        trainer: Trainer = Trainer(),
    ) -> None:
        super().__init__(trainer=trainer, batch_size=batch_size)
        self.prediction_length = prediction_length
        self.context_length = context_length
        self.distr_output = distr_output
        self.num_cells = num_cells
        self.num_sample_paths = num_sample_paths
        self.scaling = scaling

    def create_transformation(self):
        # Feature transformation that the model uses for input.
        return AddObservedValuesIndicator(
            target_field=FieldName.TARGET,
            output_field=FieldName.OBSERVED_VALUES,
        )

    def create_training_data_loader(self, dataset, **kwargs):
        instance_splitter = InstanceSplitter(
            target_field=FieldName.TARGET,
            is_pad_field=FieldName.IS_PAD,
            start_field=FieldName.START,
            forecast_start_field=FieldName.FORECAST_START,
            instance_sampler=ExpectedNumInstanceSampler(
                num_instances=1, min_future=self.prediction_length
            ),
            past_length=self.context_length,
            future_length=self.prediction_length,
            time_series_fields=[
                FieldName.FEAT_DYNAMIC_REAL,
                FieldName.OBSERVED_VALUES,
            ],
        )
        input_names = get_hybrid_forward_input_names(MyProbTrainNetwork)
        return TrainDataLoader(
            dataset=dataset,
            transform=instance_splitter + SelectFields(input_names),
            batch_size=self.batch_size,
            stack_fn=partial(batchify, ctx=self.trainer.ctx, dtype=self.dtype),
            **kwargs,
        )

    def create_training_network(self) -> MyProbTrainNetwork:
        return MyProbTrainNetwork(
            prediction_length=self.prediction_length,
            context_length=self.context_length,
            distr_output=self.distr_output,
            num_cells=self.num_cells,
            num_sample_paths=self.num_sample_paths,
            scaling=self.scaling,
        )

    def create_predictor(
        self, transformation: Transformation, trained_network: HybridBlock
    ) -> Predictor:
        prediction_splitter = InstanceSplitter(
            target_field=FieldName.TARGET,
            is_pad_field=FieldName.IS_PAD,
            start_field=FieldName.START,
            forecast_start_field=FieldName.FORECAST_START,
            instance_sampler=TestSplitSampler(),
            past_length=self.context_length,
            future_length=self.prediction_length,
            time_series_fields=[
                FieldName.FEAT_DYNAMIC_REAL,
                FieldName.OBSERVED_VALUES,
            ],
        )

        prediction_network = MyProbPredNetwork(
            prediction_length=self.prediction_length,
            context_length=self.context_length,
            distr_output=self.distr_output,
            num_cells=self.num_cells,
            num_sample_paths=self.num_sample_paths,
            scaling=self.scaling,
        )

        copy_parameters(trained_network, prediction_network)

        return RepresentableBlockPredictor(
            input_transform=transformation + prediction_splitter,
            prediction_net=prediction_network,
            batch_size=self.batch_size,
            prediction_length=self.prediction_length,
            ctx=self.trainer.ctx,
        )
[80]:
estimator = MyProbEstimator(
    prediction_length=custom_ds_metadata["prediction_length"],
    context_length=2 * custom_ds_metadata["prediction_length"],
    distr_output=GaussianOutput(),
    num_cells=40,
    trainer=Trainer(
        ctx="cpu",
        epochs=5,
        learning_rate=1e-3,
        hybridize=False,
        num_batches_per_epoch=100,
    ),
)
[81]:
predictor = estimator.train(train_ds)
100%|██████████| 100/100 [00:00<00:00, 131.84it/s, epoch=1/5, avg_epoch_loss=0.967]
100%|██████████| 100/100 [00:00<00:00, 126.56it/s, epoch=2/5, avg_epoch_loss=0.536]
100%|██████████| 100/100 [00:00<00:00, 126.96it/s, epoch=3/5, avg_epoch_loss=0.483]
100%|██████████| 100/100 [00:00<00:00, 130.39it/s, epoch=4/5, avg_epoch_loss=0.455]
100%|██████████| 100/100 [00:00<00:00, 112.58it/s, epoch=5/5, avg_epoch_loss=0.431]
[82]:
forecast_it, ts_it = make_evaluation_predictions(
    dataset=test_ds,  # test dataset
    predictor=predictor,  # predictor
    num_samples=100,  # number of sample paths we want for evaluation
)
[83]:
forecasts = list(forecast_it)
tss = list(ts_it)
[84]:
plt.plot(tss[0][-150:].to_timestamp())
forecasts[0].plot(show_label=True)
plt.legend()
[84]:
<matplotlib.legend.Legend at 0x7effc7997d60>
../../_images/tutorials_forecasting_extended_tutorial_123_1.png

From feedforward to RNN#

In all the previous examples we have used a feedforward neural network as the base for our forecasting model. The main idea behind it was to use as an input to the network a window of the time series (of length context_length) and train the network to forecast the following window (of length prediction_length).

In this section we will replace the feedforward network with a recurrent neural network (RNN). Due to the different nature of RNNs the structure of the networks will be a bit different. Let’s see what are the major changes.

Training#

The main idea behind RNN is the same as in the feedforward networks we already constructed: as we unroll the RNN at each time step we use as an input past values of the time series and forecast the next value. We can enhance the input by using multiple past values (for example specific lags based on seasonality patterns) or available features. However, in this example we will keep things simple and just use the last value of the time series. The output of the network at each time step is the distribution of the value of the next time step, where the state of the RNN is used as the feature vector for the parameter projection of the distribution.

Due to the sequential nature of the RNN, the distinction between past_ and future_ in the cut window of the time series is not really necessary. Therefore, we can concatenate past_target and future_target and treat it as a concrete target window that we wish to forecast. This means that the input to the RNN would be (sequentially) the window target[-(context_length + prediction_length + 1):-1] (one time step before the window we want to predict). As a consequence, we need to have context_length + prediction_length + 1 available values at each window that we cut. We can define this in the InstanceSplitter.

Overall, during training the steps are the following:

  • We pass sequentially through the RNN the target values target[-(context_length + prediction_length + 1):-1]

  • We use the state of the RNN at each time step as a feature vector and project it to the distribution parameter domain

  • The output at each time step is the distribution of the values of the next time step, which overall is the forecasted distribution for the window target[-(context_length + prediction_length):]

The above steps are implemented in the unroll_encoder method.

Inference#

During inference we know the values only of past_target therefore we cannot follow exactly the same steps as in training. However the main idea is very similar:

  • We pass sequentially through the RNN the past target values past_target[-(context_length + 1):] that effectively updates the state of the RNN

  • In the last time step the output of the RNN is effectively the distribution of the next value of the time series (which we do not know). Therefore we sample (num_sample_paths times) from this distribution and use the samples as inputs to the RNN for the next time step

  • We repeat the previous step prediction_length times

The first step is implemented in unroll_encoder and the last steps in the sample_decoder method.

[85]:
class MyProbRNN(gluon.HybridBlock):
    def __init__(
        self,
        prediction_length,
        context_length,
        distr_output,
        num_cells,
        num_layers,
        num_sample_paths=100,
        scaling=True,
        **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.prediction_length = prediction_length
        self.context_length = context_length
        self.distr_output = distr_output
        self.num_cells = num_cells
        self.num_layers = num_layers
        self.num_sample_paths = num_sample_paths
        self.proj_distr_args = distr_output.get_args_proj()
        self.scaling = scaling

        with self.name_scope():
            self.rnn = mx.gluon.rnn.HybridSequentialRNNCell()
            for k in range(self.num_layers):
                cell = mx.gluon.rnn.LSTMCell(hidden_size=self.num_cells)
                cell = mx.gluon.rnn.ResidualCell(cell) if k > 0 else cell
                self.rnn.add(cell)

            if scaling:
                self.scaler = MeanScaler(keepdims=True)
            else:
                self.scaler = NOPScaler(keepdims=True)

    def compute_scale(self, past_target, past_observed_values):
        # scale is computed on the context length last units of the past target
        # scale shape is (batch_size, 1, *target_shape)
        _, scale = self.scaler(
            past_target.slice_axis(axis=1, begin=-self.context_length, end=None),
            past_observed_values.slice_axis(
                axis=1, begin=-self.context_length, end=None
            ),
        )

        return scale

    def unroll_encoder(
        self,
        F,
        past_target,
        past_observed_values,
        future_target=None,
        future_observed_values=None,
    ):
        # overall target field
        # input target from -(context_length + prediction_length + 1) to -1
        if future_target is not None:  # during training
            target_in = F.concat(past_target, future_target, dim=-1).slice_axis(
                axis=1,
                begin=-(self.context_length + self.prediction_length + 1),
                end=-1,
            )

            # overall observed_values field
            # input observed_values corresponding to target_in
            observed_values_in = F.concat(
                past_observed_values, future_observed_values, dim=-1
            ).slice_axis(
                axis=1,
                begin=-(self.context_length + self.prediction_length + 1),
                end=-1,
            )

            rnn_length = self.context_length + self.prediction_length
        else:  # during inference
            target_in = past_target.slice_axis(
                axis=1, begin=-(self.context_length + 1), end=-1
            )

            # overall observed_values field
            # input observed_values corresponding to target_in
            observed_values_in = past_observed_values.slice_axis(
                axis=1, begin=-(self.context_length + 1), end=-1
            )

            rnn_length = self.context_length

        # compute scale
        scale = self.compute_scale(target_in, observed_values_in)

        # scale target_in
        target_in_scale = F.broadcast_div(target_in, scale)

        # compute network output
        net_output, states = self.rnn.unroll(
            inputs=target_in_scale,
            length=rnn_length,
            layout="NTC",
            merge_outputs=True,
        )

        return net_output, states, scale


class MyProbTrainRNN(MyProbRNN):
    def hybrid_forward(
        self,
        F,
        past_target,
        future_target,
        past_observed_values,
        future_observed_values,
    ):
        net_output, _, scale = self.unroll_encoder(
            F, past_target, past_observed_values, future_target, future_observed_values
        )

        # output target from -(context_length + prediction_length) to end
        target_out = F.concat(past_target, future_target, dim=-1).slice_axis(
            axis=1, begin=-(self.context_length + self.prediction_length), end=None
        )

        # project network output to distribution parameters domain
        distr_args = self.proj_distr_args(net_output)

        # compute distribution
        distr = self.distr_output.distribution(distr_args, scale=scale)

        # negative log-likelihood
        loss = distr.loss(target_out)
        return loss


class MyProbPredRNN(MyProbTrainRNN):
    def sample_decoder(self, F, past_target, states, scale):
        # repeat fields: from (batch_size, past_target_length) to
        # (batch_size * num_sample_paths, past_target_length)
        repeated_states = [
            s.repeat(repeats=self.num_sample_paths, axis=0) for s in states
        ]
        repeated_scale = scale.repeat(repeats=self.num_sample_paths, axis=0)

        # first decoder input is the last value of the past_target, i.e.,
        # the previous value of the first time step we want to forecast
        decoder_input = past_target.slice_axis(axis=1, begin=-1, end=None).repeat(
            repeats=self.num_sample_paths, axis=0
        )

        # list with samples at each time step
        future_samples = []

        # for each future time step we draw new samples for this time step and update the state
        # the drawn samples are the inputs to the rnn at the next time step
        for k in range(self.prediction_length):
            rnn_outputs, repeated_states = self.rnn.unroll(
                inputs=decoder_input,
                length=1,
                begin_state=repeated_states,
                layout="NTC",
                merge_outputs=True,
            )

            # project network output to distribution parameters domain
            distr_args = self.proj_distr_args(rnn_outputs)

            # compute distribution
            distr = self.distr_output.distribution(distr_args, scale=repeated_scale)

            # draw samples (batch_size * num_samples, 1)
            new_samples = distr.sample()

            # append the samples of the current time step
            future_samples.append(new_samples)

            # update decoder input for the next time step
            decoder_input = new_samples

        samples = F.concat(*future_samples, dim=1)

        # (batch_size, num_samples, prediction_length)
        return samples.reshape(
            shape=(-1, self.num_sample_paths, self.prediction_length)
        )

    def hybrid_forward(self, F, past_target, past_observed_values):
        # unroll encoder over context_length
        net_output, states, scale = self.unroll_encoder(
            F, past_target, past_observed_values
        )

        samples = self.sample_decoder(F, past_target, states, scale)

        return samples
[86]:
class MyProbRNNEstimator(GluonEstimator):
    @validated()
    def __init__(
        self,
        prediction_length: int,
        context_length: int,
        distr_output: DistributionOutput,
        num_cells: int,
        num_layers: int,
        num_sample_paths: int = 100,
        scaling: bool = True,
        batch_size: int = 32,
        trainer: Trainer = Trainer(),
    ) -> None:
        super().__init__(trainer=trainer, batch_size=batch_size)
        self.prediction_length = prediction_length
        self.context_length = context_length
        self.distr_output = distr_output
        self.num_cells = num_cells
        self.num_layers = num_layers
        self.num_sample_paths = num_sample_paths
        self.scaling = scaling

    def create_transformation(self):
        # Feature transformation that the model uses for input.
        return AddObservedValuesIndicator(
            target_field=FieldName.TARGET,
            output_field=FieldName.OBSERVED_VALUES,
        )

    def create_training_data_loader(self, dataset, **kwargs):
        instance_splitter = InstanceSplitter(
            target_field=FieldName.TARGET,
            is_pad_field=FieldName.IS_PAD,
            start_field=FieldName.START,
            forecast_start_field=FieldName.FORECAST_START,
            instance_sampler=ExpectedNumInstanceSampler(
                num_instances=1,
                min_future=self.prediction_length,
            ),
            past_length=self.context_length + 1,
            future_length=self.prediction_length,
            time_series_fields=[
                FieldName.FEAT_DYNAMIC_REAL,
                FieldName.OBSERVED_VALUES,
            ],
        )
        input_names = get_hybrid_forward_input_names(MyProbTrainRNN)
        return TrainDataLoader(
            dataset=dataset,
            transform=instance_splitter + SelectFields(input_names),
            batch_size=self.batch_size,
            stack_fn=partial(batchify, ctx=self.trainer.ctx, dtype=self.dtype),
            **kwargs,
        )

    def create_training_network(self) -> MyProbTrainRNN:
        return MyProbTrainRNN(
            prediction_length=self.prediction_length,
            context_length=self.context_length,
            distr_output=self.distr_output,
            num_cells=self.num_cells,
            num_layers=self.num_layers,
            num_sample_paths=self.num_sample_paths,
            scaling=self.scaling,
        )

    def create_predictor(
        self, transformation: Transformation, trained_network: HybridBlock
    ) -> Predictor:
        prediction_splitter = InstanceSplitter(
            target_field=FieldName.TARGET,
            is_pad_field=FieldName.IS_PAD,
            start_field=FieldName.START,
            forecast_start_field=FieldName.FORECAST_START,
            instance_sampler=TestSplitSampler(),
            past_length=self.context_length + 1,
            future_length=self.prediction_length,
            time_series_fields=[
                FieldName.FEAT_DYNAMIC_REAL,
                FieldName.OBSERVED_VALUES,
            ],
        )
        prediction_network = MyProbPredRNN(
            prediction_length=self.prediction_length,
            context_length=self.context_length,
            distr_output=self.distr_output,
            num_cells=self.num_cells,
            num_layers=self.num_layers,
            num_sample_paths=self.num_sample_paths,
            scaling=self.scaling,
        )

        copy_parameters(trained_network, prediction_network)

        return RepresentableBlockPredictor(
            input_transform=transformation + prediction_splitter,
            prediction_net=prediction_network,
            batch_size=self.batch_size,
            prediction_length=self.prediction_length,
            ctx=self.trainer.ctx,
        )
[87]:
estimator = MyProbRNNEstimator(
    prediction_length=24,
    context_length=48,
    num_cells=40,
    num_layers=2,
    distr_output=GaussianOutput(),
    trainer=Trainer(
        ctx="cpu",
        epochs=5,
        learning_rate=1e-3,
        hybridize=False,
        num_batches_per_epoch=100,
    ),
)
[88]:
predictor = estimator.train(train_ds)
100%|██████████| 100/100 [00:17<00:00,  5.80it/s, epoch=1/5, avg_epoch_loss=0.752]
100%|██████████| 100/100 [00:16<00:00,  6.01it/s, epoch=2/5, avg_epoch_loss=0.394]
100%|██████████| 100/100 [00:17<00:00,  5.87it/s, epoch=3/5, avg_epoch_loss=0.342]
100%|██████████| 100/100 [00:17<00:00,  5.83it/s, epoch=4/5, avg_epoch_loss=0.294]
100%|██████████| 100/100 [00:16<00:00,  6.15it/s, epoch=5/5, avg_epoch_loss=0.268]
[89]:
forecast_it, ts_it = make_evaluation_predictions(
    dataset=test_ds,  # test dataset
    predictor=predictor,  # predictor
    num_samples=100,  # number of sample paths we want for evaluation
)
[90]:
forecasts = list(forecast_it)
tss = list(ts_it)
[91]:
plt.plot(tss[0][-150:].to_timestamp())
forecasts[0].plot(show_label=True)
plt.legend()
[91]:
<matplotlib.legend.Legend at 0x7effa4f01850>
../../_images/tutorials_forecasting_extended_tutorial_131_1.png