Source code for mlnext.anomaly

"""Module for analyzing anomalies.
"""
import typing as T
import warnings
from operator import ge
from operator import gt

import numpy as np
import pandas as pd

from .data import detemporalize
from .utils import check_ndim
from .utils import check_shape
from .utils import truncate

__all__ = [
    'find_anomalies',
    'recall_anomalies',
    'rank_features',
    'apply_point_adjust',
    'apply_point_adjust_score',
]


[docs]def find_anomalies( y: T.Union[np.ndarray, pd.DataFrame] ) -> T.List[T.Tuple[int, int]]: """Finds continuous segments of anomalies and returns a list of tuple with the start end end index of each anomaly. Args: y (T.union[np.ndarray, pd.DataFrame]): Array of labels (1d). Returns: T.List[T.Tuple[int, int]]: Returns a list of tuples. A tuple consists of 2 elements with the start and end index of the anomaly. Example: >>> find_anomalies(np.array([0, 1, 1, 0, 1, 0, 1, 1])) [(1, 2), (4, 4), (6, 7)] """ y = np.atleast_1d(np.array(y).squeeze()) check_ndim(y, ndim=1) y = pd.Series(y) # true for index i when y[i - 1] = 0 and y[i] = 1 start = (y > y.shift(1, fill_value=0)) # true for index i when y[i] = 1 and y[i + 1] = 0 end = (y > y.shift(-1, fill_value=0)) # get indices where true start, end = np.flatnonzero(start), np.flatnonzero(end) return list(zip(start, end))
[docs]def recall_anomalies( y: np.ndarray, y_hat: np.ndarray, *, k: float = 0 ) -> float: """Calculates the percentage of anomaly segments that are correctly detected. The parameter ``k`` [in %] controls how much of a segments needs to be detected for it being counted as detected. Args: y (np.ndarray): Ground Truth. y_hat (np.ndarray): Label predictions. k (float): Percentage ([0, 100]) of points that need to be detected in a segment for it to be counted. For K = 0, then at least one point has to be detected. For K = 100, then every point in the segment has to be correctly detected. Default: 0. Returns: float: Returns the fraction of detected anomaly segments. """ y_hat, y = np.array(y_hat).squeeze(), np.array(y).squeeze() check_ndim(y_hat, y, ndim=1), check_shape(y_hat, y) if not (0 <= k <= 100): raise ValueError(f'k must be in [0, 100], got "{k}".') anomalies = find_anomalies(y) detected = _recall_anomalies(anomalies, y_hat, k=k) return detected / len(anomalies)
def _recall_anomalies( anomalies: T.List[T.Tuple[int, int]], y_hat: np.ndarray, *, k: float = 0 ) -> int: """Determines the number of detected segments for a given ``k``. Args: anomalies (T.List[T.Tuple[int, int]]): Start and end index of anomalies. y_hat (np.ndarray): Predictions. k (float): Percentage ([0, 100]) of points that need to be detected in a segment for it to be counted. For K = 0, then at least one point has to be detected. For K = 100, then every point in the segment has to be correctly detected. Default: 0. Returns: int: Returns the number of detected segments. """ # determine operator: for k = 0 use > else >= _op = gt if k == 0 else ge detected = np.sum([ _op(np.sum(y_hat[s:(e + 1)]), ((k / 100) * (e + 1 - s))) for s, e in anomalies ]) return detected
[docs]def rank_features( *, error: np.ndarray, y: np.ndarray ) -> T.Tuple[T.List[T.Tuple[int, int]], np.ndarray, np.ndarray]: """Finds the anomalies in y and calculates the feature-wise error for each anomaly. Each feature is ranked accordingly to their mean error during the anomaly. Args: error (np.ndarray): Error (2d or 3d). y (np.ndarray): Labels (1d). Raises: ValueError: Raised if length do not align for `error` and `y` or no anomalies were found. Returns: T.Tuple[T.List[T.Tuple[int, int]], np.ndarray]: Returns a tuple of 1. List of tuple where a tuple contains the start and end index of an anomaly. 2. A 2d array where each rows contains the ranked feature indexes. 3. A 2d array where each rows contains the mean error for the features in order of 2. Example: >>> errors = np.array([[0.1, 0.8, 0.3, 0.25], [0.2, 0.4, 0.2, 0.6]]).T >>> y = np.array([0, 1, 0, 1]) >>> segments, rankings, mean_errors = rank_features(error=errors, y=y) >>> segments [(1, 1), (3, 3)] >>> rankings [[0, 1], [1, 0]] >>> mean_errors [[0.8, 0.4], [0.6, 0.25]] """ # 1. finds all anomalies in y # 2. computes the average per feature per segment # 3. ranks the features according to their average error per anomaly error = detemporalize(error, verbose=False) y = detemporalize(y, verbose=False) if (e_len := error.shape[0]) != (y_len := y.shape[0]): warnings.warn(f'Length misaligned, got {e_len} and {y_len}.') (error, y), = truncate((error, y)) if error.shape[-1] < 2: raise ValueError('Expected at least 2 features.') error = detemporalize(error) # get anomaly segments anomalies = find_anomalies(y) if len(anomalies) < 1: raise ValueError('No anomalies found.') # calculate mean per feature for each anomaly # rank the features according to their mean error for the anomaly errors = np.array([_sort_features(error, a) for a in anomalies]) rankings = np.array(errors[:, :, 0], dtype='int32') mean_errors = errors[:, :, 1] return anomalies, rankings, mean_errors
def _sort_features( error: np.ndarray, idx: T.Tuple[int, int] ) -> T.List[T.Tuple[int, float]]: """Calculates the mean error per feature for an anomaly. Args: error (np.ndarray): Errors. idx (T.List[T.Tuple[int, int]]): Tuple of (start, end) indices of an anomaly. Returns: T.List[T.Tuple[int, float]]: Returns a list of sorted tuples containing the index and the mean error for the anomaly. """ # calculate error by feature mean_err = np.mean(error[idx[0]:(idx[1] + 1)], axis=0) # rank error (tuple of (idx, mean_err)) rank_err = sorted(enumerate(mean_err), key=lambda item: item[1], reverse=True) return rank_err
[docs]def apply_point_adjust( *, y_hat: np.ndarray, y: np.ndarray, k: float = 0 ) -> np.ndarray: """Implements the point-adjust approach from https://arxiv.org/abs/1802.03903 and its variation from https://arxiv.org/abs/2109.05257 (parameter ``k``). For a ground truth anomaly segment in ``y``: - ``k=0``, if any point ``x`` in the segment was classified as anomalous (``y_hat=1`` for ``x``) - ``0 < k < 100``, if more than (>) ``%k`` of points in the segment are classified as anomalous (``y_hat=1`` for %k of points) - ``k=100`` if all points in the segment are classified as anomalous (``y_hat=1`` for all points) then the label for all observations in the segment are adjusted to ``y_hat=1``. If ``k=0`` it is equal to the original point-adjust, if ``k=100`` it is equal to the F1. Args: y_hat (np.ndarray): Label predictions (1d). y (np.ndarray): Ground Truth (1d). k (int): Percentage [0, 100] of points detected as an anomaly in a segment before an adjustment is made (Default: 0). Returns: np.ndarray: Returns the point-adjusted ``y_hat``. Example: >>> import mlnext >>> import numpy as np >>> mlnext.apply_point_adjust( ... y_hat = np.array([1, 0, 0, 1, 0, 0, 0, 1, 1]), ... y = np.array([0, 0, 1, 1, 1, 0, 1, 1, 0])) [1, 0, 1, 1, 1, 0, 1, 1, 1] >>> # for k = 40; only adjusts the second segment >>> mlnext.apply_point_adjust( ... y_hat = np.array([1, 0, 0, 1, 0, 0, 0, 1, 1]), ... y = np.array([0, 0, 1, 1, 1, 0, 1, 1, 0]) ... k = 40) [1, 0, 0, 1, 0, 0, 1, 1, 1] """ y, y_hat = np.array(y).squeeze(), np.array(y_hat).squeeze() check_ndim(y, y_hat, ndim=1) if y_hat.shape != y.shape: warnings.warn(f'Shapes unaligned {y_hat.shape} and {y.shape}.') (y_hat, y), = truncate((y_hat, y)) y_hat = np.copy(y_hat) if k < 0 or k > 100: raise ValueError(f'Parameter k must be in [0, 100], but got: {k}.') for (start, end) in find_anomalies(y): s = np.s_[start: (end + 1)] # check if more than %k points of that segment are anomalous # otherwise the label is left as is if np.sum(y_hat[s]) > (k * (end + 1 - start)) / 100: y_hat[s] = 1 return y_hat
[docs]def apply_point_adjust_score( *, y_score: np.ndarray, y: np.ndarray, k: float = 0 ) -> np.ndarray: """Implements the point-adjust approach from https://arxiv.org/pdf/1802.03903.pdf and its variation from https://arxiv.org/abs/2109.05257 (parameter ``k``) for prediction scores. For a ground truth anomaly segment in ``y``: - ``k=0``, the score of all points are adjusted to the maximum score in the segment - ``0 < k < 100``, the score for the adjustment is chosen, such that at least %k of points in the anomaly segments have a higher score and only the points below the chosen score are adjusted to the score - ``k=100``, no adjustment is made If ``k=0`` it is equal to the original point-adjust, if ``k=100`` it is equal to the F1. This method allows the usage of the point-adjust method in conjunction with precision-recall and other similar curves. Args: y_score (np.ndarray): Prediction score in range [0, 1] (1d array). y (np.ndarray): Ground truth (1d array). Returns: np.ndarray: Returns the adjusted array. Example: >>> import numpy as np >>> import mlnext >>> mlnext.apply_point_adjust_score( ... y_score = np.array([0.1, 0.4, 0.6, 0.7, 0.4, 0.2, 0.4, 0.6, 0.25]), ... y= np.array([ 0, 0, 1, 1, 1, 0, 1, 1, 0]), ... k=0) [0.1, 0.4, 0.7, 0.7, 0.7, 0.2, 0.6, 0.6, 0.25] >>> # for k=40; both segments are adjusted >>> mlnext.apply_point_adjust_score( ... y_score = np.array([0.1, 0.4, 0.6, 0.7, 0.4, 0.2, 0.4, 0.6, 0.25]), ... y= np.array([ 0, 0, 1, 1, 1, 0, 1, 1, 0]), ... k=40) [0.1, 0.4, 0.6, 0.7, 0.6, 0.2, 0.6, 0.6, 0.25] """ y, y_score = np.array(y).squeeze(), np.array(y_score).squeeze() check_ndim(y, y_score, ndim=1) if y_score.shape != y.shape: warnings.warn(f'Shapes unaligned {y_score.shape} and {y.shape}.') (y_score, y), = truncate((y_score, y)) y_score = np.copy(y_score) if k < 0 or k > 100: raise ValueError(f'Parameter k must be in [0, 100], but got: {k}.') for (start, end) in find_anomalies(y): s = np.s_[start: (end + 1)] # find the index of the element that fulfills the condition: # at least %k points are above a threshold length = (end + 1) - start index = min(int(np.floor((length * k) / 100)) + 1, length) score = np.sort(y_score[s])[-index] # adjust only points that are below the score mask = y_score[s] < score y_score[s][mask] = score return y_score