# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
import math
import random
from abc import abstractmethod
from typing import (
Callable,
Dict,
List,
NamedTuple,
Optional,
Tuple,
Union,
cast,
)
import numpy as np
import pandas as pd
from pandas.tseries.frequencies import to_offset
from pandas.tseries.offsets import BaseOffset, Week
from gluonts.dataset.artificial.recipe import (
BinaryHolidays,
BinaryMarkovChain,
Constant,
ForEachCat,
Lag,
LinearTrend,
RandomCat,
RandomGaussian,
Stack,
generate,
take_as_list,
)
from gluonts.dataset.common import (
BasicFeatureInfo,
CategoricalFeatureInfo,
DataEntry,
Dataset,
ListDataset,
MetaData,
TrainDatasets,
)
from gluonts.dataset.field_names import FieldName
from gluonts.dataset.stat import (
DatasetStatistics,
calculate_dataset_statistics,
)
class DatasetInfo(NamedTuple):
"""
Information stored on a dataset.
When downloading from the repository, the dataset repository checks that
the obtained version matches the one declared in
dataset_info/dataset_name.json.
"""
name: str
metadata: MetaData
prediction_length: int
train_statistics: DatasetStatistics
test_statistics: DatasetStatistics
[docs]class ArtificialDataset:
"""
Parent class of a dataset that can be generated from code.
"""
def __init__(self, freq) -> None:
self.freq: BaseOffset = to_offset(freq)
@property
@abstractmethod
def metadata(self) -> MetaData:
pass
@property
@abstractmethod
def train(self) -> List[DataEntry]:
pass
@property
@abstractmethod
def test(self) -> List[DataEntry]:
pass
# todo return the same type as dataset repo for better usability
[docs] def generate(self) -> TrainDatasets:
return TrainDatasets(
metadata=self.metadata,
train=ListDataset(self.train, self.freq),
test=ListDataset(self.test, self.freq),
)
[docs]class ConstantDataset(ArtificialDataset):
def __init__(
self,
num_timeseries: int = 10,
num_steps: int = 30,
freq: str = "1H",
start: str = "2000-01-01 00:00:00",
# Generates constant dataset of 0s with explicit NaN missing values
is_nan: bool = False,
# Inserts random constant value for each time series
is_random_constant: bool = False,
# Generates constants on various scales
is_different_scales: bool = False,
# Determines whether the time series in the test
# and train set should have different constant values
is_piecewise: bool = False,
# Determines whether to add Gaussian noise to the constant dataset
is_noise: bool = False,
# Determines whether some time series will have very long lengths
is_long: bool = False,
# Determines whether some time series will have very short lengths
is_short: bool = False,
# Determines whether to add linear trends
is_trend: bool = False,
# Number of missing values in the middle of the time series
num_missing_middle: int = 0,
# Determines whether to add promotions to the target time series
# and to store in metadata
is_promotions: bool = False,
# Determines whether to add holidays to the target time series
# and to store in metadata
holidays: Optional[List[pd.Timestamp]] = None,
) -> None:
super().__init__(freq)
self.num_timeseries = num_timeseries
self.num_steps = num_steps
self.num_training_steps = self.num_steps // 10 * 8
self.prediction_length = self.num_steps - self.num_training_steps
self.is_nan = is_nan
self.is_random_constant = is_random_constant
self.is_different_scales = is_different_scales
self.is_piecewise = is_piecewise
self.is_noise = is_noise
self.is_long = is_long
self.is_short = is_short
self.is_trend = is_trend
self.num_missing_middle = num_missing_middle
self.is_promotions = is_promotions
self.holidays = holidays
if isinstance(self.freq, Week):
self.freq = Week(
self.freq.n, weekday=pd.Timestamp(start).weekday()
)
self.start = cast(pd.Period, pd.Period(start, self.freq))
@property
def metadata(self) -> MetaData:
metadata = MetaData(
freq=self.freq.freqstr,
feat_static_cat=[
{
"name": "feat_static_cat_000",
"cardinality": str(self.num_timeseries),
}
],
feat_static_real=[{"name": "feat_static_real_000"}],
prediction_length=self.prediction_length,
)
if self.is_promotions or self.holidays:
metadata = MetaData(
freq=self.freq.freqstr,
feat_static_cat=[
{
"name": "feat_static_cat_000",
"cardinality": str(self.num_timeseries),
}
],
feat_static_real=[{"name": "feat_static_real_000"}],
feat_dynamic_real=[
BasicFeatureInfo(name=FieldName.FEAT_DYNAMIC_REAL)
],
prediction_length=self.prediction_length,
)
return metadata
[docs] def determine_constant(
self, index: int, constant: Optional[float] = None, seed: int = 1
) -> Optional[float]:
if self.is_random_constant:
my_random = random.Random(seed)
constant = (index + 1) * my_random.random()
elif self.is_different_scales:
if index == 0:
constant = 1e-8
elif constant is not None:
constant *= 100
else:
constant = float(index)
return constant
[docs] def compute_data_from_recipe(
self,
num_steps: int,
constant: Optional[float] = None,
one_to_zero: float = 0.1,
zero_to_one: float = 0.1,
scale_features: float = 200,
) -> TrainDatasets:
recipe = []
recipe_type = Constant(constant)
if self.is_noise:
recipe_type += RandomGaussian() # Use default stddev = 1.0
if self.is_trend:
recipe_type += LinearTrend()
if self.is_promotions:
recipe.append(
("binary_causal", BinaryMarkovChain(one_to_zero, zero_to_one))
)
recipe.append(
(FieldName.FEAT_DYNAMIC_REAL, Stack(["binary_causal"]))
)
recipe_type += scale_features * Lag("binary_causal", lag=0)
if self.holidays:
# Compute dates array
dates = list(
pd.period_range(self.start, periods=num_steps, freq=self.freq)
)
recipe.append(
("binary_holidays", BinaryHolidays(dates, self.holidays))
)
recipe.append(
(FieldName.FEAT_DYNAMIC_REAL, Stack(["binary_holidays"]))
)
recipe_type += scale_features * Lag("binary_holidays", lag=0)
recipe.append((FieldName.TARGET, recipe_type))
max_train_length = num_steps - self.prediction_length
data = RecipeDataset(
recipe=recipe,
metadata=self.metadata,
max_train_length=max_train_length,
prediction_length=self.prediction_length,
# Add 1 time series at a time in the loop for different constant
# valus per time series
num_timeseries=1,
)
generated = data.generate()
return generated
[docs] def piecewise_constant(self, index: int, num_steps: int) -> List:
target = []
for j in range(num_steps):
if j < self.num_training_steps:
constant = self.determine_constant(index=index)
else:
constant = self.determine_constant(index=index, seed=2)
target.append(constant)
return target
[docs] def get_num_steps(
self,
index: int,
num_steps_max: int = 10000,
long_freq: int = 4,
num_steps_min: int = 2,
short_freq: int = 4,
) -> int:
num_steps = self.num_steps
if self.is_long and index % long_freq == 0:
num_steps = num_steps_max
elif self.is_short and index % short_freq == 0:
num_steps = num_steps_min
return num_steps
[docs] @staticmethod
def insert_nans_and_zeros(ts_len: int) -> List:
target = []
for j in range(ts_len):
# Place NaNs at even indices. Use convention no NaNs before start
# date.
if j != 0 and j % 2 == 0:
target.append(np.nan)
# Place zeros at odd indices
else:
target.append(0.0)
return target
[docs] def insert_missing_vals_middle(
self, ts_len: int, constant: Optional[float]
) -> List[Optional[float]]:
target: List[Optional[float]] = []
lower_bound = (self.num_training_steps - self.num_missing_middle) // 2
upper_bound = (self.num_training_steps + self.num_missing_middle) // 2
num_missing_endpts = math.floor(0.1 * self.num_missing_middle)
for j in range(ts_len):
if (
0 < j < lower_bound
and j % (2 * num_missing_endpts) == 0
or lower_bound <= j < upper_bound
or j >= upper_bound
and j % (2 * num_missing_endpts) == 0
):
target.append(float("nan"))
else:
target.append(constant)
return target
[docs] def generate_ts(
self, num_ts_steps: int, is_train: bool = False
) -> List[DataEntry]:
res = []
constant = None
for i in range(self.num_timeseries):
if self.is_nan:
target = self.insert_nans_and_zeros(num_ts_steps)
elif self.is_piecewise:
target = self.piecewise_constant(i, num_ts_steps)
else:
constant = self.determine_constant(i, constant)
if self.num_missing_middle > 0:
target = self.insert_missing_vals_middle(
num_ts_steps, constant
)
elif (
self.is_noise
or self.is_trend
or self.is_promotions
or self.holidays
):
num_steps = self.get_num_steps(i)
generated = self.compute_data_from_recipe(
num_steps, constant
)
if is_train:
time_series = generated.train
else:
assert generated.test is not None
time_series = generated.test
# returns np array convert to list for consistency
target = list(time_series)[0][FieldName.TARGET].tolist()
else:
target = [constant] * num_ts_steps
ts_data = dict(
start=self.start,
target=target,
item_id=str(i),
feat_static_cat=[i],
feat_static_real=[i],
)
if self.is_promotions or self.holidays:
ts_data[FieldName.FEAT_DYNAMIC_REAL] = list(time_series)[0][
FieldName.FEAT_DYNAMIC_REAL
].tolist()
res.append(ts_data)
return res
@property
def train(self) -> List[DataEntry]:
return self.generate_ts(
num_ts_steps=self.num_training_steps, is_train=True
)
@property
def test(self) -> List[DataEntry]:
return self.generate_ts(num_ts_steps=self.num_steps)
[docs]class ComplexSeasonalTimeSeries(ArtificialDataset):
"""
Generate sinus time series that ramp up and reach a certain amplitude, and
level and have additional spikes on each sunday.
TODO: This could be converted to a RecipeDataset to avoid code duplication.
"""
def __init__(
self,
num_series: int = 100,
prediction_length: int = 20,
freq_str: str = "D",
length_low: int = 30,
length_high: int = 200,
min_val: float = -10000,
max_val: float = 10000,
is_integer: bool = False,
proportion_missing_values: float = 0,
is_noise: bool = True,
is_scale: bool = True,
percentage_unique_timestamps: float = 0.07,
is_out_of_bounds_date: bool = False,
seasonality: Optional[int] = None,
clip_values: bool = False,
) -> None:
"""
:param num_series: number of time series generated in the train and
test set
:param prediction_length:
:param freq_str:
:param length_low: minimum length of a time-series, must be larger than
prediction_length
:param length_high: maximum length of a time-series
:param min_val: min value of a time-series
:param max_val: max value of a time-series
:param is_integer: whether the dataset has integers or not
:param proportion_missing_values:
:param is_noise: whether to add noise
:param is_scale: whether to add scale
:param percentage_unique_timestamps: percentage of random start dates
bounded between 0 and 1
:param is_out_of_bounds_date: determines whether to use very old start
dates and start dates far in the future
:param seasonality: Seasonality of the generated data. If not given
uses default seasonality for frequency
:param clip_values: if True the values will be clipped to
[min_val, max_val], otherwise linearly scales them
"""
assert length_low > prediction_length
super().__init__(freq_str)
self.num_series = num_series
self.prediction_length = prediction_length
self.length_low = length_low
self.length_high = length_high
self.freq_str = freq_str
self.min_val = min_val
self.max_val = max_val
self.is_integer = is_integer
self.proportion_missing_values = proportion_missing_values
self.is_noise = is_noise
self.is_scale = is_scale
self.percentage_unique_timestamps = percentage_unique_timestamps
self.is_out_of_bounds_date = is_out_of_bounds_date
self.seasonality = seasonality
self.clip_values = clip_values
@property
def metadata(self) -> MetaData:
return MetaData(
freq=self.freq.freqstr, prediction_length=self.prediction_length
)
def _get_period(self) -> int:
if self.seasonality is not None:
return self.seasonality
if self.freq_str == "M":
return 24
elif self.freq_str == "W":
return 52
elif self.freq_str == "D":
return 14
elif self.freq_str == "H":
return 24
elif self.freq_str == "min":
return 60
else:
raise RuntimeError()
def _get_start(self, index: int, my_random: random.Random) -> str:
if (
self.is_out_of_bounds_date and index == 0
): # Add edge case of dates out of normal bounds past date
start_y, start_m, start_d = (
1690,
2,
7,
) # Pandas doesn't allot before 1650
start_h, start_min = 18, 36
elif (
self.is_out_of_bounds_date and index == self.num_series - 1
): # Add edge case of dates out of normal bounds future date
start_y, start_m, start_d = (
2030,
6,
3,
) # Pandas doesn't allot before 1650
start_h, start_min = 18, 36
# assume that only 100 * percentage_unique_timestamps of timestamps are
# unique
elif my_random.random() < self.percentage_unique_timestamps:
start_y = my_random.randint(2000, 2018)
start_m = my_random.randint(1, 12)
start_d = my_random.randint(1, 28)
start_h = my_random.randint(0, 23)
start_min = my_random.randint(0, 59)
else:
start_y, start_m, start_d = 2013, 11, 28
start_h, start_min = 18, 36
if self.freq_str == "M":
return "%04.d-%02.d" % (start_y, start_m)
elif self.freq_str in ["W", "D"]:
return "%04.d-%02.d-%02.d" % (start_y, start_m, start_d)
elif self.freq_str == "H":
return "%04.d-%02.d-%02.d %02.d:00:00" % (
start_y,
start_m,
start_d,
start_h,
)
else:
return "%04.d-%02.d-%02.d %02.d:%02.d:00" % (
start_y,
start_m,
start_d,
start_h,
start_min,
)
def _special_time_point_indicator(self, index) -> bool:
if self.freq_str == "M":
return index.month == 1
elif self.freq_str == "W":
return index.month % 2 == 0
elif self.freq_str == "D":
return index.dayofweek == 0
elif self.freq_str == "H":
return index.hour == 0
elif self.freq_str == "min":
return index.minute % 30 == 0
else:
raise RuntimeError(f'Bad freq_str value "{index}"')
@property
def train(self) -> List[DataEntry]:
return [
dict(
start=ts[FieldName.START],
target=ts[FieldName.TARGET][: -self.prediction_length],
item_id=ts[FieldName.ITEM_ID],
)
for ts in self.make_timeseries()
]
@property
def test(self) -> List[DataEntry]:
return self.make_timeseries()
[docs] def make_timeseries(self, seed: int = 1) -> List[DataEntry]:
res = []
# Fix seed so that the training set is the same as the test set from
# 0:self.prediction_length for the two independent calls
def sigmoid(x: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-x))
# Ensure same start dates in test and training set
my_random = random.Random(seed)
state = np.random.RandomState(seed)
for i in range(self.num_series):
val_range = self.max_val - self.min_val
length = state.randint(low=self.length_low, high=self.length_high)
start = self._get_start(i, my_random)
envelope = sigmoid((np.arange(length) - 20.0) / 10.0)
level = 0.3 * val_range * (state.random_sample() - 0.5)
phi = 2 * np.pi * state.random_sample()
period = self._get_period()
w = 2 * np.pi / period
t = np.arange(length)
idx = pd.period_range(
start=start, freq=self.freq_str, periods=length
)
special_tp_indicator = self._special_time_point_indicator(idx)
sunday_effect = state.random_sample() * special_tp_indicator
v = np.sin(w * t + phi) + sunday_effect
if self.is_scale:
scale = 0.1 * val_range * state.random_sample()
v *= scale
v += level
if self.is_noise:
noise_range = 0.02 * val_range * state.random_sample()
noise = noise_range * state.normal(size=length)
v += noise
v = envelope * v
if self.clip_values:
np.clip(v, a_min=self.min_val, a_max=self.max_val, out=v)
else:
# Rather than mapping [v_min, v_max] to
# [self.min_val, self.max_val] which would lead to all the
# time series having the same min and max, we want to keep the
# same interval length.
# (v_max - v_min). We thus shift the interval [v_min, v_max] in
# [self.min_val, self.max_val] and clip it if needed.
v_min, v_max = v.min(), v.max()
p_min, p_max = (
max(self.min_val, v_min),
min(self.max_val, v_max),
)
shifted_min = np.clip(
p_min + (p_max - v_max),
a_min=self.min_val,
a_max=self.max_val,
)
shifted_max = np.clip(
p_max + (p_min - v_min),
a_min=self.min_val,
a_max=self.max_val,
)
v = shifted_min + (shifted_max - shifted_min) * (v - v_min) / (
v_max - v_min
)
if self.is_integer:
np.clip(
v,
a_min=np.ceil(self.min_val),
a_max=np.floor(self.max_val),
out=v,
)
v = np.round(v).astype(int)
v = list(v.tolist())
if self.proportion_missing_values > 0:
assert (
self.proportion_missing_values < 1.0
), "Please chose a number 0 < x < 1.0"
idx = np.arange(len(v))
state.shuffle(idx)
num_missing_values = (
int(len(v) * self.proportion_missing_values) + 1
) # Add one in case this gets zero
missing_idx = idx[:num_missing_values]
for j in missing_idx:
# Using convention that there are no missing values before
# the start date.
if j != 0:
v[j] = None if state.rand() < 0.5 else "NaN"
res.append(
dict(
start=pd.Period(start, freq=self.freq_str),
target=np.array(v),
item_id=i,
)
)
return res
[docs]class RecipeDataset(ArtificialDataset):
"""
Synthetic data set generated by providing a recipe.
A recipe is either a (non-deterministic) function
f(length: int, global_state: dict) -> dict
or list of (field, function) tuples of the form
(field: str, f(data: dict, length: int, global_state: dict) -> dict)
which is processed sequentially, with data initially set to {},
and each entry updating data[field] to the output of the function
call.
"""
def __init__(
self,
recipe: Union[
Callable, Dict[str, Callable], List[Tuple[str, Callable]]
],
metadata: MetaData,
max_train_length: int,
prediction_length: int,
num_timeseries: int,
trim_length_fun=lambda x, **kwargs: 0,
data_start=pd.Timestamp("2014-01-01"),
) -> None:
"""
:param recipe: The recipe to generate from (see class docstring)
:param metadata: The metadata to be included in the dataset
:param max_train_length: The maximum length of a training time series.
:param prediction_length: The length of the prediction range
:param num_timeseries: Number of time series to generate
:param trim_length_fun: Callable f(x: int) -> int returning the
(shortened) training length
:param data_start: Start date for the data set
"""
super().__init__(freq=metadata.freq)
self.recipe = recipe
self._metadata = metadata
self.max_train_length = max_train_length
self.prediction_length = prediction_length
self.trim_length_fun = trim_length_fun
self.num_timeseries = num_timeseries
self.data_start = cast(
pd.Period, pd.Period(data_start, freq=self._metadata.freq)
)
@property
def metadata(self) -> MetaData:
return self._metadata
@property
def train(self):
raise NotImplementedError
@property
def test(self):
raise NotImplementedError
[docs] def dataset_info(self, train_ds: Dataset, test_ds: Dataset) -> DatasetInfo:
return DatasetInfo(
name=f"RecipeDataset({repr(self.recipe)})",
metadata=self.metadata,
prediction_length=self.prediction_length,
train_statistics=calculate_dataset_statistics(train_ds),
test_statistics=calculate_dataset_statistics(test_ds),
)
[docs] @staticmethod
def trim_ts_item_end(x: DataEntry, length: int) -> DataEntry:
"""
Trim a DataEntry into a training range, by removing the last
prediction_length time points from the target and dynamic features.
"""
y = dict(
item_id=x[FieldName.ITEM_ID],
start=x[FieldName.START],
target=x[FieldName.TARGET][:-length],
)
if FieldName.FEAT_DYNAMIC_CAT in x:
y[FieldName.FEAT_DYNAMIC_CAT] = x[FieldName.FEAT_DYNAMIC_CAT][
:, :-length
]
if FieldName.FEAT_DYNAMIC_REAL in x:
y[FieldName.FEAT_DYNAMIC_REAL] = x[FieldName.FEAT_DYNAMIC_REAL][
:, :-length
]
return y
[docs] @staticmethod
def trim_ts_item_front(x: DataEntry, length: int) -> DataEntry:
"""
Trim a DataEntry into a training range, by removing the first
offset_front time points from the target and dynamic features.
"""
assert length <= len(x[FieldName.TARGET])
y = dict(
item_id=x[FieldName.ITEM_ID],
start=x[FieldName.START] + length * x[FieldName.START].freq,
target=x[FieldName.TARGET][length:],
)
if FieldName.FEAT_DYNAMIC_CAT in x:
y[FieldName.FEAT_DYNAMIC_CAT] = x[FieldName.FEAT_DYNAMIC_CAT][
:, length:
]
if FieldName.FEAT_DYNAMIC_REAL in x:
y[FieldName.FEAT_DYNAMIC_REAL] = x[FieldName.FEAT_DYNAMIC_REAL][
:, length:
]
return y
[docs] def generate(self) -> TrainDatasets:
metadata = self.metadata
data_it = generate(
length=self.max_train_length + self.prediction_length,
recipe=self.recipe,
start=self.data_start,
)
full_length_data = take_as_list(data_it, self.num_timeseries)
test_data = [
RecipeDataset.trim_ts_item_front(
x, self.trim_length_fun(x, train_length=self.max_train_length)
)
for x in full_length_data
]
train_data = [
RecipeDataset.trim_ts_item_end(x, self.prediction_length)
for x in test_data
]
return TrainDatasets(
metadata=metadata,
train=ListDataset(train_data, metadata.freq),
test=ListDataset(test_data, metadata.freq),
)
[docs]def default_synthetic() -> Tuple[DatasetInfo, Dataset, Dataset]:
recipe = [
(FieldName.TARGET, LinearTrend() + RandomGaussian()),
(FieldName.FEAT_STATIC_CAT, RandomCat([10])),
(
FieldName.FEAT_STATIC_REAL,
ForEachCat(RandomGaussian(1, (10,)), FieldName.FEAT_STATIC_CAT)
+ RandomGaussian(0.1, (10,)),
),
]
data = RecipeDataset(
recipe=recipe,
metadata=MetaData(
freq="D",
feat_static_real=[
BasicFeatureInfo(name=FieldName.FEAT_STATIC_REAL)
],
feat_static_cat=[
CategoricalFeatureInfo(
name=FieldName.FEAT_STATIC_CAT, cardinality=10
)
],
feat_dynamic_real=[
BasicFeatureInfo(name=FieldName.FEAT_DYNAMIC_REAL)
],
),
max_train_length=20,
prediction_length=10,
num_timeseries=10,
trim_length_fun=lambda x, **kwargs: np.minimum(
int(np.random.geometric(1 / (kwargs["train_length"] / 2))),
kwargs["train_length"],
),
)
generated = data.generate()
assert generated.test is not None
info = data.dataset_info(generated.train, generated.test)
return info, generated.train, generated.test
[docs]def constant_dataset() -> Tuple[DatasetInfo, Dataset, Dataset]:
metadata = MetaData(
freq="1H",
feat_static_cat=[
CategoricalFeatureInfo(
name="feat_static_cat_000", cardinality="10"
)
],
feat_static_real=[BasicFeatureInfo(name="feat_static_real_000")],
)
start_date = "2000-01-01 00:00:00"
train_ds = ListDataset(
data_iter=[
{
FieldName.ITEM_ID: str(i),
FieldName.START: start_date,
FieldName.TARGET: [float(i)] * 24,
FieldName.FEAT_STATIC_CAT: [i],
FieldName.FEAT_STATIC_REAL: [float(i)],
}
for i in range(10)
],
freq=metadata.freq,
)
test_ds = ListDataset(
data_iter=[
{
FieldName.ITEM_ID: str(i),
FieldName.START: start_date,
FieldName.TARGET: [float(i)] * 30,
FieldName.FEAT_STATIC_CAT: [i],
FieldName.FEAT_STATIC_REAL: [float(i)],
}
for i in range(10)
],
freq=metadata.freq,
)
info = DatasetInfo(
name="constant_dataset",
metadata=metadata,
prediction_length=2,
train_statistics=calculate_dataset_statistics(train_ds),
test_statistics=calculate_dataset_statistics(test_ds),
)
return info, train_ds, test_ds