Source code for gluonts.nursery.anomaly_detection.supervised_metrics._precision_recall_utils

# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from typing import Callable, Iterable, List, NamedTuple, Optional, Tuple

import numpy as np
from joblib import Parallel, delayed

from . import buffered_precision_recall
from .utils import labels_to_ranges


class PrecisionRecallAndWeights(NamedTuple):
    precisions: np.array
    recalls: np.array
    precision_weights: np.array
    recall_weights: np.array


def singleton_precision_recall(
    true_labels,
    pred_labels,
) -> Tuple[float, float]:
    """

    Parameters
    ----------
    true_labels
        Binary array of true labels
    pred_labels
        Binary array of predicted labels

    Returns
    -------
    precision: float
    recall: float
    """
    precision = 0.0
    recall = 0

    tp = np.sum(true_labels * pred_labels)
    true_cond_p = np.sum(true_labels)
    pred_cond_p = np.sum(pred_labels)

    if pred_cond_p > 0:
        precision = tp / pred_cond_p
    if true_cond_p > 0:
        recall = tp / true_cond_p

    return precision, recall


def precision_recall_curve_per_ts(
    labels: List[bool],
    scores: List[float],
    thresholds: np.array,
    partial_filter: Optional[Callable] = None,
    singleton_curve: bool = False,
    precision_recall_fn: Callable = buffered_precision_recall,
) -> PrecisionRecallAndWeights:
    true_ranges = labels_to_ranges(labels)
    precisions = np.zeros(len(thresholds))
    recalls = np.zeros(len(thresholds))

    precision_weights, recall_weights = (
        np.zeros(len(thresholds)),
        np.zeros(len(thresholds)),
    )

    for ix, th in enumerate(thresholds):

        if partial_filter is None:
            pred_labels = scores >= th
        else:
            pred_labels = partial_filter(th)

        if singleton_curve:
            true_labels_np = np.array(labels, dtype=float)
            pred_labels_np = np.array(pred_labels, dtype=float)
            _prec, _reca = singleton_precision_recall(
                true_labels_np, pred_labels_np
            )
            _prec_w, _reca_w = np.sum(pred_labels_np), np.sum(true_labels_np)
        else:
            pred_ranges = labels_to_ranges(pred_labels)
            _prec, _reca = precision_recall_fn(true_ranges, pred_ranges)
            _prec_w, _reca_w = len(pred_ranges), len(true_ranges)

        precisions[ix] += _prec * _prec_w
        recalls[ix] += _reca * _reca_w

        precision_weights[ix] += _prec_w
        recall_weights[ix] += _reca_w

    return PrecisionRecallAndWeights(
        precisions, recalls, precision_weights, recall_weights
    )


[docs]def aggregate_precision_recall_curve( label_score_iterable: Iterable, thresholds: Optional[np.array] = None, partial_filter: Optional[Callable] = None, singleton_curve: bool = False, precision_recall_fn: Callable = buffered_precision_recall, n_jobs: int = -1, ): """ Computes aggregate range-based precision recall curves over a data set, iterating over individual time series. Optionally takes partially constructed filter that converts given scores/thresholds to anomaly labels. See `gluonts.nursery.anomaly_detection.supervised_metrics.filters` for example filters. Parameters ---------- label_score_iterable: Iterable An iterable that gives 2-tuples of np.arrays (of identical length), corresponding to `true_labels` and `pred_scores` respectively. thresholds: np.array An np.array of score thresholds for which to compute precision recall values. If the filter_type argument is provided, these are the threshold values of the filter. If not, they will be applied as a single step hard threshold to predicted scores. partial_filter: Callable Partial constructor for a "filter" object. If provided, this function can be called with a "score_threshold" to return labels used for precision and recall computation. If not provided, labels will be assigned with a hard threshold. See `gluonts.nursery.anomaly_detection.supervised_metrics.filters` for example filters. singleton_curve: bool If true, range-based precision recall will not be computed precision_recall_fn: Function to call in order to get the precision, recall metrics. n_jobs: int How many concurrent threads for parallelization, default is -1 (use all cpu available) Returns ------- (Same as output of `sklearn.metrics.precision_recall_curve`) precision : array, shape = [n_thresholds + 1] Precision values such that element i is the precision of predictions with score >= thresholds[i] and the last element is 1. recall : array, shape = [n_thresholds + 1] Decreasing recall values such that element i is the recall of predictions with score >= thresholds[i] and the last element is 0. thresholds : array, shape = [n_thresholds <= len(np.unique(scores))] Increasing thresholds on the decision function used to compute precision and recall. """ if thresholds is None: thresholds = np.unique( np.concatenate([scores for _, scores in label_score_iterable]) ) all_metrics = Parallel(n_jobs=n_jobs, verbose=10)( delayed(precision_recall_curve_per_ts)( labels, scores, thresholds, partial_filter, singleton_curve, precision_recall_fn, ) for labels, scores in label_score_iterable ) ( all_precisions, all_recalls, all_precision_weights, all_recall_weights, ) = zip(*all_metrics) precisions = np.sum(all_precisions, axis=0) recalls = np.sum(all_recalls, axis=0) precision_weights = np.sum(all_precision_weights, axis=0) recall_weights = np.sum(all_recall_weights, axis=0) # normalize with np.errstate(divide="ignore", invalid="ignore"): precisions = np.where( precision_weights > 0, precisions / precision_weights, 0.0 ) recalls = np.where(recall_weights > 0, recalls / recall_weights, 0.0) # Start from the latest threshold where the full recall is attained. perfect_recall_ixs = np.where(recalls == 1.0)[0] first_ind = perfect_recall_ixs[-1] if len(perfect_recall_ixs) > 0 else 0 return ( np.r_[precisions[first_ind:], 1], np.r_[recalls[first_ind:], 0], thresholds[first_ind:], )
[docs]def aggregate_precision_recall( labels_pred_iterable: Iterable, precision_recall_fn: Callable = buffered_precision_recall, ) -> Tuple[float, float]: """ Computes aggregate range-based precision recall metrics for the given prediction labels. Parameters ---------- labels_pred_iterable An iterable that gives 2-tuples of boolean lists corresponding to `true_labels` and `pred_labels` respectively. precision_recall_fn Function to call in order to get the precision, recall metrics. Returns ------- A tuple containing average precision and recall in that order. """ total_prec, total_reca, total_prec_w, total_reca_w = 0.0, 0.0, 0.0, 0.0 for true_labels, pred_labels in labels_pred_iterable: true_ranges = labels_to_ranges(true_labels) pred_ranges = labels_to_ranges(pred_labels) _prec, _reca = precision_recall_fn(true_ranges, pred_ranges) _prec_w, _reca_w = len(pred_ranges), len(true_ranges) total_prec += _prec * _prec_w total_prec_w += _prec_w total_reca += _reca * _reca_w total_reca_w += _reca_w return ( total_prec / total_prec_w if total_prec_w > 0 else 0, total_reca / total_reca_w if total_reca_w > 0 else 0, )