[Download]

Writing forecasting models in GluonTS with PyTorch

This notebook illustrates how one can implement a time series model in GluonTS using PyTorch, train it with PyTorch Lightning, and use it together with the rest of the GluonTS ecosystem for data loading, feature processing, and model evaluation.

In [1]:
from typing import List, Optional, Callable, Iterable
from itertools import islice
In [2]:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
import matplotlib.dates as mdates

For this example we will use the “electricity” dataset, which can be loaded as follows.

In [3]:
from gluonts.dataset.repository.datasets import get_dataset
In [4]:
dataset = get_dataset("electricity")
saving time-series into /home/runner/.mxnet/gluon-ts/datasets/electricity/train/data.json
saving time-series into /home/runner/.mxnet/gluon-ts/datasets/electricity/test/data.json

This is what the first time series from the training portion of the dataset look like:

In [5]:
date_formater = mdates.DateFormatter('%Y')

fig = plt.figure(figsize=(12,8))
for idx, entry in enumerate(islice(dataset.train, 9)):
    ax = plt.subplot(3, 3, idx+1)
    t = pd.date_range(start=entry["start"], periods=len(entry["target"]), freq=entry["start"].freq)
    plt.plot(t, entry["target"])
    plt.xticks(pd.date_range(start=pd.to_datetime("2011-12-31"), periods=3, freq="AS"))
    ax.xaxis.set_major_formatter(date_formater)
../../_images/tutorials_forecasting_howto_pytorch_lightning_7_0.png

Probabilistic feed-forward network using PyTorch

We will use a pretty simple model, based on a feed-forward network whose output layer produces the parameters of a parametric distribution. By default, the model will use a Student’s t-distribution, but this can be easily customized via the distr_output constructor argument.

In [6]:
import torch
import torch.nn as nn
In [7]:
from gluonts.torch.model.predictor import PyTorchPredictor
from gluonts.torch.modules.distribution_output import StudentTOutput
from gluonts.model.forecast_generator import DistributionForecastGenerator
In [8]:
def mean_abs_scaling(context, min_scale=1e-5):
    return context.abs().mean(1).clamp(min_scale, None).unsqueeze(1)
In [9]:
class FeedForwardNetwork(nn.Module):
    def __init__(
        self,
        freq: str,
        prediction_length: int,
        context_length: int,
        hidden_dimensions: List[int],
        distr_output = StudentTOutput(),
        batch_norm: bool=False,
        scaling: Callable=mean_abs_scaling,
    ) -> None:
        super().__init__()

        assert prediction_length > 0
        assert context_length > 0
        assert len(hidden_dimensions) > 0

        self.freq = freq
        self.prediction_length = prediction_length
        self.context_length = context_length
        self.hidden_dimensions = hidden_dimensions
        self.distr_output = distr_output
        self.batch_norm = batch_norm
        self.scaling = scaling

        dimensions = [context_length] + hidden_dimensions[:-1]

        modules = []
        for in_size, out_size in zip(dimensions[:-1], dimensions[1:]):
            modules += [self.__make_lin(in_size, out_size), nn.ReLU()]
            if batch_norm:
                modules.append(nn.BatchNorm1d(out_size))
        modules.append(self.__make_lin(dimensions[-1], prediction_length * hidden_dimensions[-1]))

        self.nn = nn.Sequential(*modules)
        self.args_proj = self.distr_output.get_args_proj(hidden_dimensions[-1])

    @staticmethod
    def __make_lin(dim_in, dim_out):
        lin = nn.Linear(dim_in, dim_out)
        torch.nn.init.uniform_(lin.weight, -0.07, 0.07)
        torch.nn.init.zeros_(lin.bias)
        return lin

    def forward(self, context):
        scale = self.scaling(context)
        scaled_context = context / scale
        nn_out = self.nn(scaled_context)
        nn_out_reshaped = nn_out.reshape(-1, self.prediction_length, self.hidden_dimensions[-1])
        distr_args = self.args_proj(nn_out_reshaped)
        return distr_args, torch.zeros_like(scale), scale

    def get_predictor(self, input_transform, batch_size=32, device=None):
        return PyTorchPredictor(
            prediction_length=self.prediction_length,
            freq=self.freq,
            input_names=["past_target"],
            prediction_net=self,
            batch_size=batch_size,
            input_transform=input_transform,
            forecast_generator=DistributionForecastGenerator(self.distr_output),
            device=None,
        )

To train the model using PyTorch Lightning, we only need to extend the class with methods that specify how training steps are supposed to work. Please refer to documentation for PyTorch Lightning to know more about the interface you need to implement in order to fully customize the training procedure.

In [10]:
import pytorch_lightning as pl
In [11]:
class LightningFeedForwardNetwork(FeedForwardNetwork, pl.LightningModule):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def training_step(self, batch, batch_idx):
        context = batch["past_target"]
        target = batch["future_target"]

        assert context.shape[-1] == self.context_length
        assert target.shape[-1] == self.prediction_length

        distr_args, loc, scale = self(context)
        distr = self.distr_output.distribution(distr_args, loc, scale)
        loss = -distr.log_prob(target)

        return loss.mean()

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
        return optimizer

We can now instantiate the training network, and explore its set of parameters.

In [12]:
freq = "1H"
context_length = 2 * 7 * 24
prediction_length = dataset.metadata.prediction_length
hidden_dimensions = [96, 48]
In [13]:
net = LightningFeedForwardNetwork(
    freq=freq,
    prediction_length=prediction_length,
    context_length=context_length,
    hidden_dimensions=hidden_dimensions,
    distr_output=StudentTOutput(),
)
In [14]:
sum(np.prod(p.shape) for p in net.parameters())
Out[14]:
144243
In [15]:
for p in net.parameters():
    print(p.shape)
torch.Size([96, 336])
torch.Size([96])
torch.Size([1152, 96])
torch.Size([1152])
torch.Size([1, 48])
torch.Size([1])
torch.Size([1, 48])
torch.Size([1])
torch.Size([1, 48])
torch.Size([1])

Defining the training data loader

We now set up the data loader which will yield batches of data to train on. Starting from the original dataset, the data loader is configured to apply the following transformation, which does essentially two things: * Replaces nans in the target field with a dummy value (zero), and adds a field indicating which values were actually observed vs imputed this way. * Slices out training instances of a fixed length randomly from the given dataset; these will be stacked into batches by the data loader itself.

In [16]:
from gluonts.dataset.field_names import FieldName
from gluonts.transform import AddObservedValuesIndicator, InstanceSplitter, ExpectedNumInstanceSampler, TestSplitSampler
In [17]:
mask_unobserved = AddObservedValuesIndicator(
    target_field=FieldName.TARGET,
    output_field=FieldName.OBSERVED_VALUES,
)
In [18]:
training_splitter = InstanceSplitter(
    target_field=FieldName.TARGET,
    is_pad_field=FieldName.IS_PAD,
    start_field=FieldName.START,
    forecast_start_field=FieldName.FORECAST_START,
    instance_sampler=ExpectedNumInstanceSampler(
        num_instances=1,
        min_future=prediction_length,
    ),
    past_length=context_length,
    future_length=prediction_length,
    time_series_fields=[FieldName.OBSERVED_VALUES],
)
In [19]:
from gluonts.dataset.loader import TrainDataLoader
from gluonts.itertools import Cached
from gluonts.torch.batchify import batchify
In [20]:
batch_size = 32
num_batches_per_epoch = 50
In [21]:
data_loader = TrainDataLoader(
    # We cache the dataset, to make training faster
    Cached(dataset.train),
    batch_size=batch_size,
    stack_fn=batchify,
    transform=mask_unobserved + training_splitter,
    num_batches_per_epoch=num_batches_per_epoch,
)

Train the model

We can now train the model using the tooling that PyTorch Lightning provides:

In [22]:
trainer = pl.Trainer(max_epochs=10, gpus=-1 if torch.cuda.is_available() else None)
trainer.fit(net, train_dataloader=data_loader)
INFO:pytorch_lightning.utilities.distributed:GPU available: False, used: False
INFO:pytorch_lightning.utilities.distributed:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.core.lightning:
  | Name      | Type       | Params
-----------------------------------------
0 | nn        | Sequential | 144 K
1 | args_proj | PtArgProj  | 147
-----------------------------------------
144 K     Trainable params
0         Non-trainable params
144 K     Total params
0.577     Total estimated model params size (MB)
Epoch 9: : 50it [00:00, 61.31it/s, loss=5.89, v_num=0]
Out[22]:
1

Create predictor out of the trained model, and test it

Now we can get the predictor out of our model, and use it to make forecasts.

In [23]:
prediction_splitter = InstanceSplitter(
    target_field=FieldName.TARGET,
    is_pad_field=FieldName.IS_PAD,
    start_field=FieldName.START,
    forecast_start_field=FieldName.FORECAST_START,
    instance_sampler=TestSplitSampler(),
    past_length=context_length,
    future_length=prediction_length,
    time_series_fields=[FieldName.OBSERVED_VALUES],
)
In [24]:
predictor_pytorch = net.get_predictor(mask_unobserved + prediction_splitter)

For example, we can do backtesting on the test dataset: in what follows, make_evaluation_predictions will slice out the trailing prediction_length observations from the test time series, and use the given predictor to obtain forecasts for the same time range.

In [25]:
from gluonts.evaluation import make_evaluation_predictions, Evaluator
In [26]:
forecast_it, ts_it = make_evaluation_predictions(
    dataset=dataset.test, predictor=predictor_pytorch
)

forecasts_pytorch = list(f.to_sample_forecast() for f in forecast_it)
tss_pytorch = list(ts_it)

Once we have the forecasts, we can plot them:

In [27]:
plt.figure(figsize=(20, 15))
date_formater = mdates.DateFormatter('%b, %d')
plt.rcParams.update({'font.size': 15})

for idx, (forecast, ts) in islice(enumerate(zip(forecasts_pytorch, tss_pytorch)), 9):
    ax = plt.subplot(3, 3, idx+1)

    plt.plot(ts[-5 * prediction_length:], label="target")
    forecast.plot()
    plt.xticks(rotation=60)
    ax.xaxis.set_major_formatter(date_formater)

plt.gcf().tight_layout()
plt.legend()
plt.show()
../../_images/tutorials_forecasting_howto_pytorch_lightning_37_0.png

And we can compute evaluation metrics, that summarize the performance of the model on our test data.

In [28]:
evaluator = Evaluator(quantiles=[0.1, 0.5, 0.9])
In [29]:
metrics_pytorch, _ = evaluator(iter(tss_pytorch), iter(forecasts_pytorch), num_series=len(dataset.test))
pd.DataFrame.from_records(metrics_pytorch, index=["FeedForward"]).transpose()
Running evaluation: 100%|██████████| 2247/2247 [00:00<00:00, 65514.13it/s]
Out[29]:
FeedForward
Coverage[0.1] 6.186026e-02
Coverage[0.5] 4.029076e-01
Coverage[0.9] 8.574210e-01
MAE_Coverage 5.927039e-02
MAPE 1.652528e-01
MASE 1.132887e+00
MSE 3.617722e+06
MSIS 9.098256e+00
ND 1.020945e-01
NRMSE 7.974063e-01
OWA NaN
QuantileLoss[0.1] 6.193642e+06
QuantileLoss[0.5] 1.313272e+07
QuantileLoss[0.9] 6.372190e+06
RMSE 1.902031e+03
abs_error 1.313272e+07
abs_target_mean 2.385272e+03
abs_target_sum 1.286330e+08
mean_absolute_QuantileLoss 8.566184e+06
mean_wQuantileLoss 6.659400e-02
sMAPE 1.564280e-01
seasonal_error 1.894934e+02
wQuantileLoss[0.1] 4.814973e-02
wQuantileLoss[0.5] 1.020945e-01
wQuantileLoss[0.9] 4.953777e-02