Source code for gluonts.time_feature.lag

# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from typing import List, Optional

import numpy as np
from pandas.tseries.frequencies import to_offset

from gluonts.time_feature import norm_freq_str


def _make_lags(middle: int, delta: int) -> np.ndarray:
    """
    Create a set of lags around a middle point including +/- delta.
    """
    return np.arange(middle - delta, middle + delta + 1).tolist()


[docs]def get_lags_for_frequency( freq_str: str, lag_ub: int = 1200, num_lags: Optional[int] = None, num_default_lags: int = 7, ) -> List[int]: """ Generates a list of lags that that are appropriate for the given frequency string. By default all frequencies have the following lags: [1, 2, 3, 4, 5, 6, 7]. Remaining lags correspond to the same `season` (+/- `delta`) in previous `k` cycles. Here `delta` and `k` are chosen according to the existing code. Parameters ---------- freq_str Frequency string of the form [multiple][granularity] such as "12H", "5min", "1D" etc. lag_ub The maximum value for a lag. num_lags Maximum number of lags; by default all generated lags are returned. num_default_lags The number of default lags; by default it is 7. """ # Lags are target values at the same `season` (+/- delta) but in the # previous cycle. def _make_lags_for_second(multiple, num_cycles=3): # We use previous ``num_cycles`` hours to generate lags return [ _make_lags(k * 60 // multiple, 2) for k in range(1, num_cycles + 1) ] def _make_lags_for_minute(multiple, num_cycles=3): # We use previous ``num_cycles`` hours to generate lags return [ _make_lags(k * 60 // multiple, 2) for k in range(1, num_cycles + 1) ] def _make_lags_for_hour(multiple, num_cycles=7): # We use previous ``num_cycles`` days to generate lags return [ _make_lags(k * 24 // multiple, 1) for k in range(1, num_cycles + 1) ] def _make_lags_for_day( multiple, num_cycles=4, days_in_week=7, days_in_month=30 ): # We use previous ``num_cycles`` weeks to generate lags # We use the last month (in addition to 4 weeks) to generate lag. return [ _make_lags(k * days_in_week // multiple, 1) for k in range(1, num_cycles + 1) ] + [_make_lags(days_in_month // multiple, 1)] def _make_lags_for_week(multiple, num_cycles=3): # We use previous ``num_cycles`` years to generate lags # Additionally, we use previous 4, 8, 12 weeks return [ _make_lags(k * 52 // multiple, 1) for k in range(1, num_cycles + 1) ] + [[4 // multiple, 8 // multiple, 12 // multiple]] def _make_lags_for_month(multiple, num_cycles=3): # We use previous ``num_cycles`` years to generate lags return [ _make_lags(k * 12 // multiple, 1) for k in range(1, num_cycles + 1) ] # multiple, granularity = get_granularity(freq_str) offset = to_offset(freq_str) # normalize offset name, so that both `W` and `W-SUN` refer to `W` offset_name = norm_freq_str(offset.name) if offset_name in ["A", "Y", "YE"]: lags = [] elif offset_name in ["Q", "QE"]: assert ( offset.n == 1 ), "Only multiple 1 is supported for quarterly. Use x month instead." lags = _make_lags_for_month(offset.n * 3.0) elif offset_name in ["M", "ME"]: lags = _make_lags_for_month(offset.n) elif offset_name == "W": lags = _make_lags_for_week(offset.n) elif offset_name == "D": lags = _make_lags_for_day(offset.n) + _make_lags_for_week( offset.n / 7.0 ) elif offset_name == "B": lags = _make_lags_for_day( offset.n, days_in_week=5, days_in_month=22 ) + _make_lags_for_week(offset.n / 5.0) elif offset_name in ["H", "h"]: lags = ( _make_lags_for_hour(offset.n) + _make_lags_for_day(offset.n / 24) + _make_lags_for_week(offset.n / (24 * 7)) ) # minutes elif offset_name in ["T", "min"]: lags = ( _make_lags_for_minute(offset.n) + _make_lags_for_hour(offset.n / 60) + _make_lags_for_day(offset.n / (60 * 24)) + _make_lags_for_week(offset.n / (60 * 24 * 7)) ) # second elif offset_name in ["S", "s"]: lags = ( _make_lags_for_second(offset.n) + _make_lags_for_minute(offset.n / 60) + _make_lags_for_hour(offset.n / (60 * 60)) ) else: raise ValueError(f"invalid frequency: {freq_str=}, {offset_name=}") # flatten lags list and filter lags = [ int(lag) for sub_list in lags for lag in sub_list if 7 < lag <= lag_ub ] lags = list(range(1, num_default_lags + 1)) + sorted(list(set(lags))) return lags[:num_lags]