Source code for gluonts.mx.model.deepvar_hierarchical._estimator

# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

# Standard library imports
import logging
from typing import List, Optional

import numpy as np
from mxnet.gluon import HybridBlock
import mxnet as mx

# First-party imports
from gluonts.core.component import validated
from gluonts.mx import DeepVAREstimator
from gluonts.mx.model.predictor import Predictor
from gluonts.mx.model.predictor import RepresentableBlockPredictor
from gluonts.mx.distribution import LowrankMultivariateGaussianOutput
from gluonts.mx.trainer import Trainer
from gluonts.mx.util import copy_parameters
from gluonts.time_feature import TimeFeature
from gluonts.transform import Transformation


# Relative imports
from ._network import (
    DeepVARHierarchicalPredictionNetwork,
    DeepVARHierarchicalTrainingNetwork,
)


logger = logging.getLogger(__name__)


[docs]def constraint_mat(S: np.ndarray) -> np.ndarray: """ Generates the constraint matrix in the equation: Ay = 0 (y being the values/forecasts of all time series in the hierarchy). Parameters ---------- S Summation or aggregation matrix. Shape: (total_num_time_series, num_bottom_time_series) Returns ------- Numpy ND array Coefficient matrix of the linear constraints, shape (num_agg_time_series, num_time_series) """ # Re-arrange S matrix to form A matrix # S = [S_agg|I_m_K]^T dim:(m,m_K) # A = [I_magg | -S_agg] dim:(m_agg,m) m, m_K = S.shape m_agg = m - m_K # The top `m_agg` rows of the matrix `S` give the aggregation constraint # matrix. S_agg = S[:m_agg, :] A = np.hstack((np.eye(m_agg), -S_agg)) return A
[docs]def null_space_projection_mat(A: np.ndarray) -> np.ndarray: """ Computes the projection matrix for projecting onto the null space of A. Parameters ---------- A The constraint matrix A in the equation: Ay = 0 (y being the values/forecasts of all time series in the hierarchy). Returns ------- Numpy ND array Projection matrix, shape (total_num_time_series, total_num_time_series) """ num_ts = A.shape[1] return np.eye(num_ts) - A.T @ np.linalg.pinv(A @ A.T) @ A
[docs]class DeepVARHierarchicalEstimator(DeepVAREstimator): """ Constructs a DeepVARHierarchical estimator, which is a hierachical extension of DeepVAR. The model has been described in the ICML 2021 paper: http://proceedings.mlr.press/v139/rangapuram21a.html Parameters ---------- freq Frequency of the data to train on and predict prediction_length Length of the prediction horizon target_dim Dimensionality of the input dataset (i.e., the total number of time series in the hierarchical dataset). S Summation or aggregation matrix. num_samples_for_loss Number of samples to draw from the predicted distribution to compute the training loss. likelihood_weight Weight for the negative log-likelihood loss. Default: 0.0. If not zero, then negative log-likelihood (times `likelihood_weight`) is added to the CRPS loss (times `CRPS_weight`). CRPS_weight Weight for the CRPS loss component. Default: 1.0. If zero, then loss is only negative log-likelihood (times `likelihood_weight`). If non-zero, then CRPS loss (times 'CRPS_weight') is added to the negative log-likelihood loss (times `likelihood_weight`). sample_LH Boolean flag to specify if likelihood should be computed using the distribution based on (coherent) samples. Default: False (in this case likelihood is computed using the parametric distribution predicted by the network). coherent_train_samples Flag to indicate whether coherence should be enforced during training. Default: True. coherent_pred_samples Flag to indicate whether coherence should be enforced during prediction. Default: True. warmstart_epoch_frac Specifies the epoch (as a fraction of total number of epochs) from when to start enforcing coherence during training. seq_axis Specifies the list of axes that should be processed sequentially (only during training). The reference axes are: (num_samples_for_loss, batch, seq_length, target_dim). This is useful if batch processing is not possible because of insufficient memory (e.g. if both num_samples_for_loss and target_dim are very large). In such cases, use seq_axis = [1]. By default, all axes are processeed in parallel. log_coherency_error Flag to indicate whether to compute and show the cohererncy error on the samples generated during prediction. trainer Trainer object to be used (default: Trainer()) context_length Number of steps to unroll the RNN for before computing predictions (default: None, in which case context_length = prediction_length) num_layers Number of RNN layers (default: 2) num_cells Number of RNN cells for each layer (default: 40) cell_type Type of recurrent cells to use (available: 'lstm' or 'gru'; default: 'lstm') num_parallel_samples Number of evaluation samples per time series to increase parallelism during inference. This is a model optimization that does not affect the accuracy (default: 100) dropout_rate Dropout regularization parameter (default: 0.1) use_feat_dynamic_real Whether to use the ``feat_dynamic_real`` field from the data (default: False) cardinality Number of values of each categorical feature (default: [1]) embedding_dimension Dimension of the embeddings for categorical features (default: 5]) scaling Whether to automatically scale the target values (default: true) pick_incomplete Whether training examples can be sampled with only a part of past_length time-units lags_seq Indices of the lagged target values to use as inputs of the RNN (default: None, in which case these are automatically determined based on freq) time_features Time features to use as inputs of the RNN (default: None, in which case these are automatically determined based on freq) batch_size The size of the batches to be used training and prediction. """ @validated() def __init__( self, freq: str, prediction_length: int, target_dim: int, S: np.ndarray, num_samples_for_loss: int = 200, likelihood_weight: float = 0.0, CRPS_weight: float = 1.0, sample_LH: bool = False, coherent_train_samples: bool = True, coherent_pred_samples: bool = True, warmstart_epoch_frac: float = 0.0, seq_axis: Optional[List[int]] = None, log_coherency_error: bool = True, trainer: Trainer = Trainer(), context_length: Optional[int] = None, num_layers: int = 2, num_cells: int = 40, cell_type: str = "lstm", num_parallel_samples: int = 100, dropout_rate: float = 0.1, use_feat_dynamic_real: bool = False, cardinality: List[int] = [1], embedding_dimension: int = 5, scaling: bool = True, pick_incomplete: bool = False, lags_seq: Optional[List[int]] = None, time_features: Optional[List[TimeFeature]] = None, batch_size: int = 32, **kwargs, ) -> None: # This implementation only works for multivariate Gaussian with # diagonal covariance and no transformation. Fixing them here upfront. # If the method is exteneded, then these can be passed as arguments of # the estimator. rank = 0 distr_output = LowrankMultivariateGaussianOutput( dim=target_dim, rank=rank ) use_marginal_transformation = False conditioning_length = 0 # This estimator doesn't work in symbolic mode. if trainer.hybridize: logger.info( f"Resetting `hybridize` flag of trainer to False, " f"since {__name__} does not work in symbolic mode." ) trainer.hybridize = False super().__init__( freq=freq, prediction_length=prediction_length, target_dim=target_dim, context_length=context_length, num_layers=num_layers, num_cells=num_cells, cell_type=cell_type, num_parallel_samples=num_parallel_samples, dropout_rate=dropout_rate, use_feat_dynamic_real=use_feat_dynamic_real, cardinality=cardinality, embedding_dimension=embedding_dimension, distr_output=distr_output, rank=rank, scaling=scaling, pick_incomplete=pick_incomplete, lags_seq=lags_seq, time_features=time_features, conditioning_length=conditioning_length, use_marginal_transformation=use_marginal_transformation, trainer=trainer, batch_size=batch_size, **kwargs, ) assert target_dim == S.shape[0], ( "The number of rows of `S` matrix must be equal to `target_dim`. " f"Either `S` matrix is incorrectly constructed or a wrong value " f"is passed for `target_dim`: shape of `S`: {S.shape} and " f"`target_dim`: {target_dim}." ) # Assert that projection is *not* being done only during training assert coherent_pred_samples or ( not coherent_train_samples ), "Cannot project only during training (and not during prediction)" A = constraint_mat(S.astype(self.dtype)) M = null_space_projection_mat(A) ctx = self.trainer.ctx self.M, self.A = mx.nd.array(M, ctx=ctx), mx.nd.array(A, ctx=ctx) self.num_samples_for_loss = num_samples_for_loss self.likelihood_weight = likelihood_weight self.CRPS_weight = CRPS_weight self.log_coherency_error = log_coherency_error self.coherent_train_samples = coherent_train_samples self.coherent_pred_samples = coherent_pred_samples self.warmstart_epoch_frac = warmstart_epoch_frac self.sample_LH = sample_LH self.seq_axis = seq_axis
[docs] def create_training_network(self) -> DeepVARHierarchicalTrainingNetwork: return DeepVARHierarchicalTrainingNetwork( M=self.M, A=self.A, num_samples_for_loss=self.num_samples_for_loss, likelihood_weight=self.likelihood_weight, CRPS_weight=self.CRPS_weight, seq_axis=self.seq_axis, coherent_train_samples=self.coherent_train_samples, warmstart_epoch_frac=self.warmstart_epoch_frac, epochs=self.trainer.epochs, num_batches_per_epoch=self.trainer.num_batches_per_epoch, sample_LH=self.sample_LH, target_dim=self.target_dim, num_layers=self.num_layers, num_cells=self.num_cells, cell_type=self.cell_type, history_length=self.history_length, context_length=self.context_length, prediction_length=self.prediction_length, distr_output=self.distr_output, dropout_rate=self.dropout_rate, cardinality=self.cardinality, embedding_dimension=self.embedding_dimension, lags_seq=self.lags_seq, scaling=self.scaling, )
[docs] def create_predictor( self, transformation: Transformation, trained_network: HybridBlock ) -> Predictor: prediction_splitter = self._create_instance_splitter("test") prediction_network = DeepVARHierarchicalPredictionNetwork( M=self.M, A=self.A, log_coherency_error=self.log_coherency_error, coherent_pred_samples=self.coherent_pred_samples, target_dim=self.target_dim, num_parallel_samples=self.num_parallel_samples, num_layers=self.num_layers, num_cells=self.num_cells, cell_type=self.cell_type, history_length=self.history_length, context_length=self.context_length, prediction_length=self.prediction_length, distr_output=self.distr_output, dropout_rate=self.dropout_rate, cardinality=self.cardinality, embedding_dimension=self.embedding_dimension, lags_seq=self.lags_seq, scaling=self.scaling, ) copy_parameters(trained_network, prediction_network) return RepresentableBlockPredictor( input_transform=transformation + prediction_splitter, prediction_net=prediction_network, batch_size=self.batch_size, prediction_length=self.prediction_length, ctx=self.trainer.ctx, output_transform=self.output_transform, )