# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
import functools
import itertools
import operator
import textwrap
from pydoc import locate
from types import SimpleNamespace
from typing import (
Any,
Callable,
Dict,
Iterator,
List,
Optional,
Sequence,
Tuple,
Union,
cast,
)
import numpy as np
import pandas as pd
from gluonts.core.component import validated
from gluonts.dataset.common import DataEntry
ValueOrCallable = Union[Any, Callable]
Recipe = Union[
Callable, List[Callable], List[Tuple[str, Callable]], Dict[str, Callable]
]
Env = Dict[str, Any]
[docs]def resolve(val_or_callable: ValueOrCallable, context: Env, *args, **kwargs):
if callable(val_or_callable):
key = f"_ref_{id(val_or_callable)}"
if key not in context:
r = val_or_callable(context, *args, **kwargs)
context[key] = r
if "field_name" in kwargs:
context[kwargs["field_name"]] = r
return context[key]
elif isinstance(val_or_callable, str):
return context[val_or_callable]
elif isinstance(val_or_callable, list):
return [resolve(v, context, *args, **kwargs) for v in val_or_callable]
elif isinstance(val_or_callable, tuple):
return tuple(
resolve(v, context, *args, **kwargs) for v in val_or_callable
)
else:
return val_or_callable
[docs]def generate(
length: int,
recipe: Recipe,
start: pd.Period,
global_state: Optional[dict] = None,
seed: int = 0,
item_id_prefix: str = "",
) -> Iterator[DataEntry]:
np.random.seed(seed)
if global_state is None:
global_state = {}
if isinstance(recipe, (dict, list)):
for x in itertools.count():
data = evaluate(recipe, length=length, global_state=global_state)
yield dict(**data, item_id=item_id_prefix + str(x), start=start)
else:
assert callable(
recipe
), "generate can only be used with dictionary recipes"
for x in itertools.count():
data = recipe(length=length, global_state=global_state)
assert isinstance(
data, dict
), "generate can only be used with dictionary recipes"
yield dict(**data, item_id=item_id_prefix + str(x), start=start)
[docs]def evaluate(
recipe: Recipe,
length: ValueOrCallable,
*args,
global_state: dict = None,
**kwargs,
) -> Any:
if global_state is None:
global_state = {}
context = {}
if "length" in kwargs:
del kwargs["length"]
if "field_name" in kwargs:
del kwargs["field_name"]
if "global_state" in kwargs:
del kwargs["global_state"]
if "context" in kwargs:
context = kwargs["context"]
del kwargs["context"]
length_value = resolve(
length,
context,
length=None,
global_state=global_state,
*args,
**kwargs,
)
# convert previous format into dict
if isinstance(recipe, list) and len(recipe) > 0:
elem = recipe[0]
if (
isinstance(elem, (tuple, list))
and len(elem) == 2
and isinstance(elem[0], str)
):
recipe = cast(List[Tuple[str, Callable]], recipe)
recipe = dict(recipe)
if isinstance(recipe, dict):
data: DataEntry = {}
for k, f in recipe.items():
try:
data[k] = resolve(
f,
context,
length=length_value,
field_name=k,
global_state=global_state,
*args,
**kwargs,
)
except ValueError as e:
raise ValueError(f'Error while evaluating key "{k}"') from e
return data
elif callable(recipe):
return resolve(
recipe,
context,
length=length_value,
global_state=global_state,
*args,
**kwargs,
)
elif isinstance(recipe, list):
result = []
for i, ri in enumerate(recipe):
try:
result.append(
resolve(
ri,
context,
length=length_value,
global_state=global_state,
*args,
**kwargs,
)
)
except ValueError as e:
raise ValueError(
f"Error when evaluating entry {i} or recipe"
) from e
return result
else:
raise ValueError(f"Invalid type for recipe {recipe}")
[docs]def make_func(
length: int, recipe: Recipe, global_state=None
) -> Callable[[int, Env], DataEntry]:
if global_state is None:
global_state = {}
def f(length=length, global_state=global_state, *args, **kwargs):
return evaluate(
recipe, length=length, global_state=global_state, *args, **kwargs
)
return f
[docs]def take_as_list(iterator, num):
return list(itertools.islice(iterator, num))
[docs]class Debug:
@validated()
def __init__(self, print_global=False) -> None:
self.print_global = print_global
def __call__(self, x: Env, global_state, **kwargs):
print(x)
if self.print_global:
print(global_state)
return 0
[docs]class Lifted:
num_outputs: int = 1
def __add__(self, other):
return _LiftedBinaryOp(self, other, "+")
def __radd__(self, other):
return _LiftedBinaryOp(other, self, "+")
def __sub__(self, other):
return _LiftedBinaryOp(self, other, "-")
def __rsub__(self, other):
return _LiftedBinaryOp(other, self, "-")
def __mul__(self, other):
return _LiftedBinaryOp(self, other, "*")
def __rmul__(self, other):
return _LiftedBinaryOp(other, self, "*")
def __truediv__(self, other):
return _LiftedBinaryOp(self, other, "/")
def __rtruediv__(self, other):
return _LiftedBinaryOp(other, self, "/")
def __pow__(self, other):
return _LiftedBinaryOp(self, other, "**")
def __rpow__(self, other):
return _LiftedBinaryOp(other, self, "**")
def __and__(self, other):
return _LiftedBinaryOp(self, other, "&")
def __rand__(self, other):
return _LiftedBinaryOp(other, self, "&")
def __or__(self, other):
return _LiftedBinaryOp(self, other, "|")
def __ror__(self, other):
return _LiftedBinaryOp(other, self, "|")
def __xor__(self, other):
return _LiftedBinaryOp(self, other, "^")
def __rxor__(self, other):
return _LiftedBinaryOp(other, self, "^")
def __gt__(self, other):
return _LiftedBinaryOp(self, other, ">")
def __ge__(self, other):
return _LiftedBinaryOp(self, other, ">=")
def __lt__(self, other):
return _LiftedBinaryOp(self, other, "<")
def __le__(self, other):
return _LiftedBinaryOp(self, other, "<=")
def __eq__(self, other):
return _LiftedBinaryOp(self, other, "==")
def __ne__(self, other):
return _LiftedBinaryOp(self, other, "!=")
def __iter__(self):
for i in range(self.num_outputs):
yield _LiftedUnpacked(self, i)
def __call__(
self,
x: Env,
length: int,
field_name: str,
global_state: Dict,
*args,
**kwargs,
):
pass
[docs]def expand_shape(s, length):
if isinstance(s, int):
s = (s,)
if s is None or len(s) == 0:
return s
s = np.array(s)
if np.any(s == 0):
s[s == 0] = length
return tuple(s)
[docs]class NumpyFunc(Lifted):
@validated()
def __init__(
self,
func: str,
func_args: Tuple[Any, ...],
func_kwargs: Dict[str, Any],
):
self.func: Callable = locate(f"numpy.{func}") # type: ignore
self.func_args = func_args
self.func_kwargs = func_kwargs
def __call__(self, x: Env, length: int, *args, **kwargs):
func_args = [
resolve(u, x, length=length, *args, **kwargs)
for u in self.func_args
]
func_kwargs = {}
for k, v in self.func_kwargs.items():
v = resolve(v, x, length=length, *args, **kwargs)
if k in ["shape", "size"]:
v = expand_shape(v, length=length)
func_kwargs[k] = v
return self.func(*func_args, **func_kwargs)
lifted_numpy = SimpleNamespace()
lifted_numpy.random = SimpleNamespace()
_NUMPY_FUNC_NAMES = [
"abs",
"all",
"any",
"arange",
"arccos",
"arccosh",
"arcsin",
"arcsinh",
"arctan",
"arctan2",
"arctanh",
"argmax",
"argmin",
"ceil",
"clip",
"concatenate",
"convolve",
"cos",
"cosh",
"cumsum",
"cumprod",
"divmod",
"exp",
"floor",
"floor_divide",
"isfinite",
"isinf",
"isnan",
"linspace",
"log",
"log10",
"max",
"mean",
"mod",
"min",
"nan_to_num",
"nanargmax",
"nanargmin",
"nancumsum",
"nancumprod",
"nanmax",
"nanmean",
"nanmin",
"nanpercentile",
"nanquantile",
"nansum",
"nanstd",
"nanprod",
"nanvar",
"ones",
"ones_like",
"percentile",
"power",
"prod",
"quantile",
"random.beta",
"random.binomial",
"random.choice",
"random.chisquare",
"random.dirichlet",
"random.exponential",
"random.gamma",
"random.geometric",
"random.gumbel",
"random.hypergeometric",
"random.laplace",
"random.logistic",
"random.lognormal",
"random.logseries",
"random.multinomial",
"random.negative_binomial",
"random.normal",
"random.pareto",
"random.power",
"random.poisson",
"random.randn",
"random.randint",
"random.shuffle",
"random.uniform",
"random.vonmises",
"random.weibull",
"remainder",
"repeat",
"reshape",
"round",
"sin",
"sinh",
"shape",
"stack",
"std",
"sum",
"take",
"tan",
"tanh",
"unique",
"var",
"where",
"zeros",
"zeros_like",
]
for func_name in _NUMPY_FUNC_NAMES:
normalized_func_name = f"_np_shim_{func_name.replace('.', '_')}"
if normalized_func_name in globals():
continue
s = f"""
@functools.wraps(np.{func_name})
def {normalized_func_name}(*args, **kwargs):
return NumpyFunc('{func_name}', args, kwargs)
lifted_numpy.{func_name} = {normalized_func_name}
# disable numpy docstring tests
lifted_numpy.{func_name}.__doc__ = lifted_numpy.{func_name}.__doc__.replace('>>>', '>>')
""" # noqa: E501
exec(textwrap.dedent(s))
[docs]class Length(Lifted):
@validated()
def __init__(self, l: ValueOrCallable = None):
self.l = l
def __call__(self, x: Env, length: int, *args, **kwargs):
l = resolve(self.l, x, length, *args, **kwargs)
if l is None:
assert length is not None, (
"Cannot get value for Length() when length is not provided in"
" evaluate"
)
return length
return len(l)
[docs]def lift(input: Union[int, Callable]):
"""
Use this decorator to lift a function.
@lift
def f(x, y, length=None)
or if your function returns more results
@lift(2)
def f(x, y, length=None)
You can then use your function as part of a recipe. The function is called
with all all arguments being already resolved.
Note that you cannot serialize recipes that use the lift decorated
functions.
"""
if isinstance(input, int):
num_outs = input
else:
num_outs = 1
def w(f):
@functools.wraps(f)
def g(*f_args, **f_kwargs):
class Tmp(Lifted):
num_outputs = num_outs
@validated()
def __init__(self, f, f_args, f_kwargs):
self.f = f
self.f_args = f_args
self.f_kwargs = f_kwargs
def __call__(self, x: Env, length: int, *args, **kwargs):
resolved_f_args = [
resolve(a, x, length, *args, **kwargs)
for a in self.f_args
]
resolved_f_kwargs = {
k: resolve(v, x, length, *args, **kwargs)
for k, v in self.f_kwargs.items()
}
return self.f(
*resolved_f_args, **resolved_f_kwargs, length=length
)
return Tmp(f, f_args, f_kwargs)
return g
if isinstance(input, int):
return w
else:
return w(input)
class _LiftedUnpacked(Lifted):
@validated()
def __init__(self, base: Lifted, i: int):
self.base = base
self.i = i
def __call__(self, *args, **kwargs):
v = resolve(self.base, *args, **kwargs)
return v[self.i]
class _LiftedBinaryOp(Lifted):
@validated()
def __init__(self, left, right, op) -> None:
self.left = left
self.right = right
self.op = {
"+": operator.add,
"*": operator.mul,
"-": operator.sub,
"/": operator.truediv,
"**": operator.pow,
">": operator.gt,
">=": operator.ge,
"<": operator.lt,
"<=": operator.le,
"==": operator.eq,
"!=": operator.ne,
"|": operator.or_,
"&": operator.and_,
"^": operator.xor,
}[op]
def __call__(self, *args, **kwargs):
left = resolve(self.left, *args, **kwargs)
right = resolve(self.right, *args, **kwargs)
return self.op(left, right)
[docs]class RandomGaussian(Lifted):
@validated()
def __init__(
self, stddev: ValueOrCallable = 1.0, shape: Sequence[int] = (0,)
) -> None:
self.stddev = stddev
self.shape = shape
def __call__(self, x: Env, length: int, *args, **kwargs):
stddev = resolve(self.stddev, x, length, *args, **kwargs)
s = expand_shape(self.shape, length)
return stddev * np.random.randn(*s)
# Binary recipe that returns 1 if date is in holidays list and 0 otherwise
[docs]class BinaryHolidays(Lifted):
@validated()
# TODO: holidays is type List[datetime.date]
def __init__(self, dates: List[pd.Timestamp], holidays: List[Any]) -> None:
self.dates = dates
self.holidays = holidays
def __call__(self, *args, **kwargs):
length = len(self.dates)
out = np.ones(length)
for i, date in enumerate(self.dates):
# Convert to string to check if inside of holidays datatime.date
if date.date() in self.holidays:
out[i] = 1.0
else:
out[i] = 0.0
return out
[docs]class RandomBinary(Lifted):
@validated()
def __init__(self, prob: ValueOrCallable = 0.1) -> None:
self.prob = prob
def __call__(self, x: Env, length: int, *args, **kwargs):
prob = resolve(self.prob, x, length, *args, **kwargs)
return 1.0 * (np.random.rand(length) < prob)
[docs]class RandomSymmetricDirichlet(Lifted):
@validated()
def __init__(
self, alpha: ValueOrCallable = 1.0, shape: Sequence[int] = (0,)
) -> None:
self.alpha = alpha
self.shape = shape
def __call__(self, x, length, *args, **kwargs):
alpha = resolve(self.alpha, x, length, *args, **kwargs)
s = expand_shape(self.shape, length)
return np.random.dirichlet(alpha * np.ones(s))
[docs]class BinaryMarkovChain(Lifted):
@validated()
def __init__(
self, one_to_zero: ValueOrCallable, zero_to_one: ValueOrCallable
) -> None:
self.one_to_zero = one_to_zero
self.zero_to_one = zero_to_one
def __call__(self, x: Env, length: int, *args, **kwargs):
probs = np.zeros(2)
probs[0] = resolve(self.zero_to_one, x, length, *args, **kwargs)
probs[1] = resolve(self.one_to_zero, x, length, *args, **kwargs)
out = np.ones(length, dtype=int) # initial state is 1
uu = np.random.rand(length)
for i in range(1, length):
if uu[i] < probs[out[i - 1]]:
out[i] = 1 - out[i - 1]
else:
out[i] = out[i - 1]
return out
[docs]class Constant(Lifted):
@validated()
def __init__(self, constant) -> None:
self.constant = constant
def __call__(self, *args, **kwargs):
return self.constant
[docs]class ConstantVec(Lifted):
@validated()
def __init__(self, constant: ValueOrCallable) -> None:
self.constant = constant
def __call__(self, x: Env, length: int, *args, **kwargs):
constant = resolve(self.constant, x, length, *args, **kwargs)
return constant * np.ones(length)
[docs]class NormalizeMax(Lifted):
@validated()
def __init__(self, input) -> None:
self.input = input
def __call__(self, x: Env, *args, **kwargs):
inp = resolve(self.input, x, *args, **kwargs)
return inp / np.max(inp)
[docs]class OnesLike(Lifted):
@validated()
def __init__(self, other) -> None:
self.other = other
def __call__(self, x, length, *args, **kwargs):
other = resolve(self.other, x, length, **kwargs)
return np.ones_like(other)
[docs]class LinearTrend(Lifted):
@validated()
def __init__(self, slope: ValueOrCallable = 1.0) -> None:
self.slope = slope
def __call__(self, x, length, *args, **kwargs):
slope = resolve(self.slope, x, length, *args, **kwargs)
return slope * np.arange(length) / length
[docs]class RandomCat:
@validated()
def __init__(
self,
cardinalities: List[int],
prob_fun: Callable = RandomSymmetricDirichlet(alpha=1.0, shape=(0,)),
) -> None:
self.cardinalities = cardinalities
self.prob_fun = prob_fun
def __call__(self, x, field_name, global_state, **kwargs):
if field_name not in global_state:
probs = [self.prob_fun(x, length=c) for c in self.cardinalities]
global_state[field_name] = probs
probs = global_state[field_name]
cats = np.array(
[
np.random.choice(np.arange(len(probs[i])), p=probs[i])
for i in range(len(probs))
]
)
return cats
[docs]class Lag(Lifted):
@validated()
def __init__(
self,
input: ValueOrCallable,
lag: ValueOrCallable = 0,
pad_const: int = 0,
) -> None:
self.input = input
self.lag = lag
self.pad_const = pad_const
def __call__(self, x, *args, **kwargs):
feat = resolve(self.input, x, *args, **kwargs)
lag = resolve(self.lag, x, *args, **kwargs)
if lag > 0:
lagged_feat = np.concatenate(
(self.pad_const * np.ones(lag), feat[:-lag])
)
elif lag < 0:
lagged_feat = np.concatenate(
(feat[-lag:], self.pad_const * np.ones(-lag))
)
else:
lagged_feat = feat
return lagged_feat
[docs]class ForEachCat(Lifted):
@validated()
def __init__(self, fun, cat_field="cat", cat_idx=0) -> None:
self.fun = fun
self.cat_field = cat_field
self.cat_idx = cat_idx
def __call__(
self,
x: Env,
length: int,
field_name: str,
global_state: Dict,
*args,
**kwargs,
):
c = x[self.cat_field][self.cat_idx]
if field_name not in global_state:
global_state[field_name] = np.empty(
len(global_state[self.cat_field][self.cat_idx]),
dtype=object,
)
if global_state[field_name][c] is None:
global_state[field_name][c] = self.fun(
x, length=length, field_name=field_name, *args, **kwargs
)
return global_state[field_name][c]
[docs]class Eval(Lifted):
@validated()
def __init__(self, expr: str) -> None:
self.expr = expr
def __call__(self, x: Env, length: int, *args, **kwargs):
return eval(self.expr, globals(), dict(x=x, length=length, **kwargs))
[docs]class SmoothSeasonality(Lifted):
@validated()
def __init__(
self, period: ValueOrCallable, phase: ValueOrCallable
) -> None:
self.period = period
self.phase = phase
def __call__(self, x: Env, length: int, *args, **kwargs):
period = resolve(self.period, x, length, *args, **kwargs)
phase = resolve(self.phase, x, length, *args, **kwargs)
return (
np.sin(2.0 / period * np.pi * (np.arange(length) + phase)) + 1
) / 2.0
[docs]class Add(Lifted):
@validated()
def __init__(self, inputs: List[ValueOrCallable]) -> None:
self.inputs = inputs
def __call__(self, x: Env, length: int, *args, **kwargs):
return sum(resolve(k, x, length, *args, **kwargs) for k in self.inputs)
[docs]class Mul(Lifted):
@validated()
def __init__(self, inputs) -> None:
self.inputs = inputs
def __call__(self, x: Env, length: int, *args, **kwargs):
return functools.reduce(
operator.mul,
[resolve(k, x, length, *args, **kwargs) for k in self.inputs],
)
[docs]class NanWhere(Lifted):
@validated()
def __init__(
self, source: ValueOrCallable, nan_indicator: ValueOrCallable
) -> None:
self.source = source
self.nan_indicator = nan_indicator
def __call__(self, x: Env, length: int, *args, **kwargs):
source = resolve(self.source, x, length, *args, **kwargs)
nan_indicator = resolve(self.nan_indicator, x, length, *args, **kwargs)
out = source.copy()
out[nan_indicator == 1] = np.nan
return out
[docs]class OneMinus(Lifted):
@validated()
def __init__(self, source: ValueOrCallable) -> None:
self.source = source
def __call__(self, x: Env, length: int, *args, **kwargs):
value = resolve(self.source, x, length, *args, **kwargs)
return 1 - value
[docs]class Concatenate(Lifted):
@validated()
def __init__(self, inputs: List[ValueOrCallable], axis: int = 0) -> None:
self.inputs = inputs
self.axis = axis
def __call__(self, x: Env, length: int, *args, **kwargs):
inputs = [resolve(z, x, length, **kwargs) for z in self.inputs]
return np.concatenate(inputs, self.axis)
[docs]class Stack(Lifted):
@validated()
def __init__(self, inputs: List[ValueOrCallable]) -> None:
self.inputs = inputs
def __call__(self, x: Env, length: int, *args, **kwargs):
inputs = [resolve(z, x, length, **kwargs) for z in self.inputs]
return np.stack(inputs, axis=0)
[docs]class StackPrefix(Lifted):
@validated()
def __init__(self, prefix: str) -> None:
self.prefix = prefix
def __call__(self, x: Env, length: int, *args, **kwargs):
inputs = [v for k, v in x.items() if k.startswith(self.prefix)]
return np.stack(inputs, axis=0)
_LEGACY_WARNING_WAS_SHOWN = False
[docs]class Ref(Lifted):
@validated()
def __init__(self, field_name: str) -> None:
global _LEGACY_WARNING_WAS_SHOWN
if not _LEGACY_WARNING_WAS_SHOWN:
import warnings
warnings.warn(
"Ref is deprecated. Please use the functional api.",
)
_LEGACY_WARNING_WAS_SHOWN = True
self.field_name = field_name
def __call__(self, x: Env, length: int, *args, **kwargs):
return x[self.field_name]
[docs]class RandomInteger(Lifted):
@validated()
def __init__(
self,
low: ValueOrCallable,
high: ValueOrCallable,
shape: Optional[Sequence[int]] = (0,),
) -> None:
self.low = low
self.high = high
self.shape = shape
def __call__(self, x: Env, length: int, *args, **kwargs):
low = resolve(self.low, x, length, *args, **kwargs)
high = resolve(self.high, x, length, *args, **kwargs)
s = expand_shape(self.shape, length)
return np.random.randint(low, high, s)
[docs]class RandomChangepoints(Lifted):
@validated()
def __init__(self, max_num_changepoints: ValueOrCallable) -> None:
self.max_num_changepoints = max_num_changepoints
def __call__(self, x: Env, length: int, *args, **kwargs):
max_num_changepoints = resolve(
self.max_num_changepoints, x, length, *args, **kwargs
)
num_changepoints = np.random.randint(0, max_num_changepoints + 1)
change_idx: np.ndarray = np.sort(
np.random.randint(low=1, high=length - 1, size=(num_changepoints,))
)
# type: ignore
change_ranges: np.ndarray = np.concatenate([change_idx, [length]])
out = np.zeros(length, dtype=int)
for i in range(num_changepoints):
out[change_ranges[i] : change_ranges[i + 1]] = i + 1
return out
[docs]class Repeated(Lifted):
@validated()
def __init__(self, pattern: ValueOrCallable) -> None:
self.pattern = pattern
def __call__(self, x: Env, length: int, *args, **kwargs):
pattern = resolve(self.pattern, x, length, **kwargs)
repeats = length // len(pattern) + 1
out = np.tile(pattern, (repeats,))
return out[:length]
[docs]class Convolve(Lifted):
@validated()
def __init__(
self, input: ValueOrCallable, filter: ValueOrCallable
) -> None:
self.filter = filter
self.input = input
def __call__(self, x: Env, length: int, *args, **kwargs):
fil = resolve(self.filter, x, length, **kwargs)
inp = resolve(self.input, x, length, **kwargs)
out = np.convolve(inp, fil, mode="same")
return out
[docs]class Dilated(Lifted):
@validated()
def __init__(self, source: Callable, dilation: int) -> None:
self.source = source
self.dilation = dilation
def __call__(self, x: Env, length: int, *args, **kwargs):
inner = self.source(x, length // self.dilation + 1, **kwargs)
out = np.repeat(inner, self.dilation)
return out[:length]
[docs]class ARp(Lifted):
def __init__(
self,
phi: ValueOrCallable,
sigma: ValueOrCallable,
xhist: ValueOrCallable = None,
c: ValueOrCallable = 0.0,
noise: ValueOrCallable = None,
):
"""
Draw samples from an ARp process. Parametrized as in.
https://en.wikipedia.org/wiki/Autoregressive_model#Graphs_of_AR(p)_processes
"""
self.phi = phi
self.sigma = sigma
self.c = c
self.xhist = xhist
self.noise = noise
def __call__(self, x: Env, length: int, *args, **kwargs):
from .ar_p import ar_p
phi = resolve(self.phi, x, length, *args, **kwargs)
phi = np.asarray(phi, dtype=np.float64)
assert phi.ndim == 1, "phi should be a 1d array"
assert len(phi) > 0, "phi should have length > 0"
sigma = resolve(self.sigma, x, length, *args, **kwargs)
sigma = float(sigma)
xhist = resolve(self.xhist, x, length, *args, **kwargs)
if xhist is None:
xhist = np.zeros_like(phi)
xhist = np.asarray(xhist, dtype=np.float64)
assert (
xhist.shape == phi.shape
), "xhist should have the same length as phi"
c = resolve(self.c, x, length, *args, **kwargs)
noise = resolve(self.noise, x, length, *args, **kwargs)
if noise is not None:
noise = np.asarray(noise, dtype=np.float64)
assert noise.ndim == 1, "the noise should be a 1d array"
assert (
len(noise) == length
), f"len(noise) should be be length={length}"
v = ar_p(
phi=phi,
sigma=sigma,
length=length,
xhist=xhist,
c=c,
noise=noise,
)
return v
[docs]def normalized_ar1(tau, x0=None, norm="minmax", sigma=1.0):
r"""
Returns an ar1 process with an auto correlation time of tau.
norm can be
None -> no normalization
'minmax' -> min_max_scaled
'standard' -> 0 mean, unit variance
"""
assert norm in [None, "minmax", "standard"]
phi = lifted_numpy.exp(-1.0 / tau)
a = ARp(phi=[phi], xhist=[x0] if x0 is not None else None, sigma=sigma)
if norm is None:
return a
elif norm == "minmax":
amin = lifted_numpy.min(a)
amax = lifted_numpy.max(a)
return (a - amin) / (amax - amin)
elif norm == "standard":
return (a - lifted_numpy.mean(a)) / lifted_numpy.std(a)
else:
raise NotImplementedError()
[docs]class Choose(Lifted):
@validated()
def __init__(
self, options: ValueOrCallable, selector: ValueOrCallable
) -> None:
self.options = options
self.selector = selector
def __call__(self, x, length, **kwargs):
options = resolve(self.options, x, length, **kwargs)
selector = resolve(self.selector, x, length, **kwargs)
e = np.eye(options.shape[0])
out = np.sum(e[selector] * options.T, axis=1)
return out
[docs]class EvalRecipe(Lifted):
@validated()
def __init__(
self, recipe: Recipe, op: Optional[ValueOrCallable] = None
) -> None:
self.recipe = recipe
self.op = op
def __call__(self, x: Env, *args, **kwargs):
xx = evaluate(self.recipe, *args, **kwargs)
if self.op is not None:
return resolve(self.op, xx, *args, **kwargs)
else:
return xx