Source code for gluonts.mx.model.deepvar_hierarchical._estimator

# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

# Standard library imports
import logging
from typing import List, Optional

import numpy as np
from mxnet.gluon import HybridBlock
import mxnet as mx

# First-party imports
from gluonts.core.component import validated
from gluonts.mx import DeepVAREstimator
from gluonts.mx.model.predictor import Predictor
from gluonts.mx.model.predictor import RepresentableBlockPredictor
from gluonts.mx.distribution import LowrankMultivariateGaussianOutput
from gluonts.mx.trainer import Trainer
from gluonts.mx.util import copy_parameters
from gluonts.time_feature import TimeFeature
from gluonts.transform import Transformation


# Relative imports
from ._network import (
    DeepVARHierarchicalPredictionNetwork,
    DeepVARHierarchicalTrainingNetwork,
)


logger = logging.getLogger(__name__)


[docs]def projection_mat( S: np.ndarray, D: Optional[np.ndarray] = None, ) -> np.ndarray: r""" Computes the projection matrix :math: P for projecting base forecasts :math: \bar{y} on to the space of coherent forecasts: :math: P \bar{y}. More precisely, .. math:: P = S (S^T S)^{-1} S^T, if D is None,\\ P = S (S^T D S)^{-1} S^T D, otherwise. Parameters ---------- S The summation or the aggregation matrix. Shape: (total_num_time_series, num_bottom_time_series) D Symmetric positive definite matrix (typically a diagonal matrix). Shape: (total_num_time_series, total_num_time_series) Optional. If provided then the distance between the reconciled and unreconciled forecasts is calculated based on the norm induced by D. Useful for weighing the distances differently for each level of the hierarchy. By default Euclidean distance is used. Returns ------- Numpy ND array Projection matrix, shape (total_num_time_series, total_num_time_series) """ if D is None: return S @ np.linalg.pinv(S.T @ S) @ S.T else: assert np.all(D == D.T), "`D` must be symmetric." assert np.all( np.linalg.eigvals(D) > 0 ), "`D` must be positive definite." return S @ np.linalg.pinv(S.T @ D @ S) @ S.T @ D
[docs]class DeepVARHierarchicalEstimator(DeepVAREstimator): """ Constructs a DeepVARHierarchical estimator, which is a hierachical extension of DeepVAR. The model has been described in the ICML 2021 paper: http://proceedings.mlr.press/v139/rangapuram21a.html Parameters ---------- freq Frequency of the data to train on and predict prediction_length Length of the prediction horizon S Summation or aggregation matrix. D Positive definite matrix (typically a diagonal matrix). Optional. If provided then the distance between the reconciled and unreconciled forecasts is calculated based on the norm induced by D. Useful for weighing the distances differently for each level of the hierarchy. By default Euclidean distance is used. num_samples_for_loss Number of samples to draw from the predicted distribution to compute the training loss. likelihood_weight Weight for the negative log-likelihood loss. Default: 0.0. If not zero, then negative log-likelihood (times `likelihood_weight`) is added to the CRPS loss (times `CRPS_weight`). CRPS_weight Weight for the CRPS loss component. Default: 1.0. If zero, then loss is only negative log-likelihood (times `likelihood_weight`). If non-zero, then CRPS loss (times 'CRPS_weight') is added to the negative log-likelihood loss (times `likelihood_weight`). sample_LH Boolean flag to specify if likelihood should be computed using the distribution based on (coherent) samples. Default: False (in this case likelihood is computed using the parametric distribution predicted by the network). coherent_train_samples Flag to indicate whether coherence should be enforced during training. Default: True. coherent_pred_samples Flag to indicate whether coherence should be enforced during prediction. Default: True. warmstart_epoch_frac Specifies the epoch (as a fraction of total number of epochs) from when to start enforcing coherence during training. seq_axis Specifies the list of axes that should be processed sequentially (only during training). The reference axes are: (num_samples_for_loss, batch, seq_length, target_dim). This is useful if batch processing is not possible because of insufficient memory (e.g. if both num_samples_for_loss and target_dim are very large). In such cases, use seq_axis = [1]. By default, all axes are processeed in parallel. log_coherency_error Flag to indicate whether to compute and show the cohererncy error on the samples generated during prediction. trainer Trainer object to be used (default: Trainer()) context_length Number of steps to unroll the RNN for before computing predictions (default: None, in which case context_length = prediction_length) num_layers Number of RNN layers (default: 2) num_cells Number of RNN cells for each layer (default: 40) cell_type Type of recurrent cells to use (available: 'lstm' or 'gru'; default: 'lstm') num_parallel_samples Number of evaluation samples per time series to increase parallelism during inference. This is a model optimization that does not affect the accuracy (default: 100) dropout_rate Dropout regularization parameter (default: 0.1) use_feat_dynamic_real Whether to use the ``feat_dynamic_real`` field from the data (default: False) cardinality Number of values of each categorical feature (default: [1]) embedding_dimension Dimension of the embeddings for categorical features (default: 5]) scaling Whether to automatically scale the target values (default: true) pick_incomplete Whether training examples can be sampled with only a part of past_length time-units lags_seq Indices of the lagged target values to use as inputs of the RNN (default: None, in which case these are automatically determined based on freq) time_features Time features to use as inputs of the RNN (default: None, in which case these are automatically determined based on freq) batch_size The size of the batches to be used training and prediction. """ @validated() def __init__( self, freq: str, prediction_length: int, S: np.ndarray, D: Optional[np.ndarray] = None, num_samples_for_loss: int = 200, likelihood_weight: float = 0.0, CRPS_weight: float = 1.0, sample_LH: bool = False, coherent_train_samples: bool = True, coherent_pred_samples: bool = True, warmstart_epoch_frac: float = 0.0, seq_axis: Optional[List[int]] = None, log_coherency_error: bool = True, trainer: Trainer = Trainer(), context_length: Optional[int] = None, num_layers: int = 2, num_cells: int = 40, cell_type: str = "lstm", num_parallel_samples: int = 100, dropout_rate: float = 0.1, use_feat_dynamic_real: bool = False, cardinality: List[int] = [1], embedding_dimension: int = 5, scaling: bool = True, pick_incomplete: bool = False, lags_seq: Optional[List[int]] = None, time_features: Optional[List[TimeFeature]] = None, batch_size: int = 32, **kwargs, ) -> None: # This implementation only works for multivariate Gaussian with # diagonal covariance and no transformation. Fixing them here upfront. # If the method is exteneded, then these can be passed as arguments of # the estimator. rank = 0 target_dim = len(S) distr_output = LowrankMultivariateGaussianOutput( dim=target_dim, rank=rank ) use_marginal_transformation = False conditioning_length = 0 # This estimator doesn't work in symbolic mode. if trainer.hybridize: logger.info( f"Resetting `hybridize` flag of trainer to False, " f"since {__name__} does not work in symbolic mode." ) trainer.hybridize = False super().__init__( freq=freq, prediction_length=prediction_length, target_dim=target_dim, context_length=context_length, num_layers=num_layers, num_cells=num_cells, cell_type=cell_type, num_parallel_samples=num_parallel_samples, dropout_rate=dropout_rate, use_feat_dynamic_real=use_feat_dynamic_real, cardinality=cardinality, embedding_dimension=embedding_dimension, distr_output=distr_output, rank=rank, scaling=scaling, pick_incomplete=pick_incomplete, lags_seq=lags_seq, time_features=time_features, conditioning_length=conditioning_length, use_marginal_transformation=use_marginal_transformation, trainer=trainer, batch_size=batch_size, **kwargs, ) # Assert that projection is *not* being done only during training assert coherent_pred_samples or ( not coherent_train_samples ), "Cannot project only during training (and not during prediction)" M = projection_mat(S=S, D=D) self.S = S ctx = self.trainer.ctx self.M = mx.nd.array(M, ctx=ctx) self.num_samples_for_loss = num_samples_for_loss self.likelihood_weight = likelihood_weight self.CRPS_weight = CRPS_weight self.log_coherency_error = log_coherency_error self.coherent_train_samples = coherent_train_samples self.coherent_pred_samples = coherent_pred_samples self.warmstart_epoch_frac = warmstart_epoch_frac self.sample_LH = sample_LH self.seq_axis = seq_axis
[docs] def create_training_network(self) -> DeepVARHierarchicalTrainingNetwork: return DeepVARHierarchicalTrainingNetwork( M=self.M, S=self.S, num_samples_for_loss=self.num_samples_for_loss, likelihood_weight=self.likelihood_weight, CRPS_weight=self.CRPS_weight, seq_axis=self.seq_axis, coherent_train_samples=self.coherent_train_samples, warmstart_epoch_frac=self.warmstart_epoch_frac, epochs=self.trainer.epochs, num_batches_per_epoch=self.trainer.num_batches_per_epoch, sample_LH=self.sample_LH, target_dim=self.target_dim, num_layers=self.num_layers, num_cells=self.num_cells, cell_type=self.cell_type, history_length=self.history_length, context_length=self.context_length, prediction_length=self.prediction_length, distr_output=self.distr_output, dropout_rate=self.dropout_rate, cardinality=self.cardinality, embedding_dimension=self.embedding_dimension, lags_seq=self.lags_seq, scaling=self.scaling, )
[docs] def create_predictor( self, transformation: Transformation, trained_network: HybridBlock ) -> Predictor: prediction_splitter = self._create_instance_splitter("test") prediction_network = DeepVARHierarchicalPredictionNetwork( M=self.M, S=self.S, log_coherency_error=self.log_coherency_error, coherent_pred_samples=self.coherent_pred_samples, target_dim=self.target_dim, num_parallel_samples=self.num_parallel_samples, num_layers=self.num_layers, num_cells=self.num_cells, cell_type=self.cell_type, history_length=self.history_length, context_length=self.context_length, prediction_length=self.prediction_length, distr_output=self.distr_output, dropout_rate=self.dropout_rate, cardinality=self.cardinality, embedding_dimension=self.embedding_dimension, lags_seq=self.lags_seq, scaling=self.scaling, ) copy_parameters(trained_network, prediction_network) return RepresentableBlockPredictor( input_transform=transformation + prediction_splitter, prediction_net=prediction_network, batch_size=self.batch_size, prediction_length=self.prediction_length, ctx=self.trainer.ctx, output_transform=self.output_transform, )