# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
import pytorch_lightning as pl
import torch
from torch.optim.lr_scheduler import ReduceLROnPlateau
from gluonts.core.component import validated
from gluonts.torch.modules.loss import DistributionLoss, NegativeLogLikelihood
from gluonts.torch.util import weighted_average
from .module import DeepARModel
[docs]class DeepARLightningModule(pl.LightningModule):
"""
A ``pl.LightningModule`` class that can be used to train a
``DeepARModel`` with PyTorch Lightning.
This is a thin layer around a (wrapped) ``DeepARModel`` object,
that exposes the methods to evaluate training and validation loss.
Parameters
----------
model
``DeepARModel`` to be trained.
loss
Loss function to be used for training,
default: ``NegativeLogLikelihood()``.
lr
Learning rate, default: ``1e-3``.
weight_decay
Weight decay regularization parameter, default: ``1e-8``.
patience
Patience parameter for learning rate scheduler, default: ``10``.
"""
@validated()
def __init__(
self,
model: DeepARModel,
loss: DistributionLoss = NegativeLogLikelihood(),
lr: float = 1e-3,
weight_decay: float = 1e-8,
patience: int = 10,
) -> None:
super().__init__()
self.save_hyperparameters()
self.model = model
self.loss = loss
self.lr = lr
self.weight_decay = weight_decay
self.patience = patience
self.example_input_array = tuple(
[
torch.zeros(shape, dtype=self.model.input_types()[name])
for (name, shape) in self.model.input_shapes().items()
]
)
[docs] def forward(self, *args, **kwargs):
return self.model(*args, **kwargs)
def _compute_loss(self, batch):
feat_static_cat = batch["feat_static_cat"]
feat_static_real = batch["feat_static_real"]
past_time_feat = batch["past_time_feat"]
past_target = batch["past_target"]
future_time_feat = batch["future_time_feat"]
future_target = batch["future_target"]
past_observed_values = batch["past_observed_values"]
future_observed_values = batch["future_observed_values"]
params, scale, _, _, _ = self.model.unroll_lagged_rnn(
feat_static_cat,
feat_static_real,
past_time_feat,
past_target,
past_observed_values,
future_time_feat,
future_target,
)
distr = self.model.output_distribution(params, scale)
context_target = past_target[:, -self.model.context_length + 1 :]
target = torch.cat(
(context_target, future_target),
dim=1,
)
loss_values = self.loss(distr, target)
context_observed = past_observed_values[
:, -self.model.context_length + 1 :
]
observed_values = torch.cat(
(context_observed, future_observed_values), dim=1
)
if len(self.model.target_shape) == 0:
loss_weights = observed_values
else:
loss_weights, _ = observed_values.min(dim=-1, keepdim=False)
return weighted_average(loss_values, weights=loss_weights)
[docs] def training_step(self, batch, batch_idx: int): # type: ignore
"""
Execute training step.
"""
train_loss = self._compute_loss(batch)
self.log(
"train_loss",
train_loss,
on_epoch=True,
on_step=False,
prog_bar=True,
)
return train_loss
[docs] def validation_step(self, batch, batch_idx: int): # type: ignore
"""
Execute validation step.
"""
val_loss = self._compute_loss(batch)
self.log(
"val_loss", val_loss, on_epoch=True, on_step=False, prog_bar=True
)
return val_loss