Source code for mlnext.pipeline

"""Module for data preprocessing."""
import datetime
import typing as T
import warnings
from functools import partial

import numpy as np
import pandas as pd
from deprecate import deprecated
from sklearn.base import BaseEstimator
from sklearn.base import OneToOneFeatureMixin
from sklearn.base import TransformerMixin
from sklearn.utils.validation import check_is_fitted
from sklearn.utils.validation import FLOAT_DTYPES

from .model import NewFeatureModel

__all__ = [
    'ColumnSelector',
    'ColumnDropper',
    'ColumnRename',
    'NaDropper',
    'Clip',
    'DatetimeTransformer',
    'NumericTransformer',
    'TimeframeExtractor',
    'DateExtractor',
    'ValueMapper',
    'Sorter',
    'Fill',
    'TimeOffsetTransformer',
    'ConditionedDropper',
    'ZeroVarianceDropper',
    'SignalSorter',
    'ColumnSorter',
    'DifferentialCreator',
    'ClippingMinMaxScaler',
    'FeatureCreator',
    'NewFeatureModel',
    'LengthTransformer',
    'RelativeTimeEncoder',
]


[docs] class ColumnSelector(BaseEstimator, TransformerMixin): """Transformer to select a list of columns by their name for further processing. If keys is None, then all columns from the fitted dataframe are kept. Args: columns (list[str] | None): Optional. List of columns to extract or None. If None, then the transformer must be fitted and only the columns present in the fitted dateframe are kept. Default: None. keys (list[str] | None): Same as columns. .. versionchanged:: 0.5.0 Changed default to None. If None, fit columns to keep on data. .. deprecated:: 0.5.0 Use columns instead. Example: >>> import mlnext >>> data = pd.DataFrame({'a': [0], 'b': [0]}) >>> mlnext.ColumnSelector(columns=['a']).transform(data) pd.DataFrame({'a': [0]}) >>> data = pd.DataFrame({'a': [0], 'b': [0]}) >>> t = mlnext.ColumnSelector().fit(pd.DataFrame({'a': [0]})) >>> t.transform(data) pd.DataFrame({'a': [0]}) """ @deprecated( True, args_mapping={'keys': 'columns'}, deprecated_in='0.5', remove_in='0.7', ) def __init__(self, columns: T.Optional[T.List[str]] = None): self._columns = columns if columns is not None: self.columns_ = columns def fit(self, X: pd.DataFrame, y=None): self.columns_ = ( X.columns.to_list() if self._columns is None else self._columns ) return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Extracts the columns from `X`. Args: X (pd.DataFrame): Dataframe. Returns: pd.DataFrame: Returns a DataFrame only containing the selected features. """ check_is_fitted(self, 'columns_') return X.loc[:, self.columns_]
[docs] class ColumnDropper(BaseEstimator, TransformerMixin): """Transformer to drop a list of ``columns`` by their name. Args: keys (list): T.List of columns names to drop. Example: >>> data = pd.DataFrame({'a': [0], 'b': [0]}) >>> ColumnDropper(columns=['b']).transform(data) pd.DataFrame({'a': [0]}) """ def __init__(self, *, columns: T.Sequence[str], verbose: bool = False): self.columns = set(columns) self.verbose = verbose def fit(self, X: pd.DataFrame, y=None): return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Drops a list of columns of `X`. Args: X (pd.DataFrame): Dataframe. Returns: pd.DataFrame: Returns the dataframe without the dropped features. """ cols = set(X.columns.to_list()) if len(m := self.columns - cols) > 0: warnings.warn(f'Columns {m} not found in dataframe.') if self.verbose: print( f'New columns: {cols - self.columns}. ' f'Removed: {self.columns}.' ) return X.drop(list(self.columns), axis=1, errors='ignore')
[docs] class ColumnRename(BaseEstimator, TransformerMixin): """Transformer to rename column with a ``mapper`` function. Args: mapper (lambda | dict[str, str]): Mapper rename function or 1-to-1 dict mapping of columns. .. versionchanged:: 0.5.0 Fixed type to indicate support for dict. Example: >>> data = pd.DataFrame({'a.b.c': [0], 'd.e.f': [0]}) >>> ColumnRename(lambda x: x.split('.')[-1]).transform(data) pd.DataFrame({'c': [0], 'f': [0]}) """ def __init__( self, mapper: T.Union[T.Dict[str, str], T.Callable[[str], str]], ): self.mapper = mapper def fit(self, X: pd.DataFrame, y=None): return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Renames a columns in `X` with a mapper function. Args: X (pd.DataFrame): Dataframe. Returns: pd.DataFrame: Returns the dataframe with the renamed columns. """ return X.rename(columns=self.mapper)
[docs] class NaDropper(BaseEstimator, TransformerMixin): """Transformer that drops rows with na values. Example: >>> data = pd.DataFrame({'a': [0, 1], 'b': [0, np.nan]}) >>> NaDropper().transform(data) pd.DataFrame({'a': [0], 'b': [0]}) """ def __init__(self): pass def fit(self, X: pd.DataFrame, y=None): return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Drops rows with na values. Args: X (pd.DataFrame): Dataframe. Returns: pd.DataFrame: Returns the new dataframe. """ return X.dropna()
[docs] class Clip(BaseEstimator, TransformerMixin): """Transformer that clips a ``columns`` to the treshold if the threshold is exceeded. Works with an ``upper`` and ``lower`` threshold. Wrapper for pd.DataFrame.clip. Args: columns (T.List[str], optional): Name of columns. If not provided, all columns in the fitted dataframe are used. lower (float, optional): lower limit. Defaults to 0. upper (float, optional): upper limit. Defaults to 1. Example: >>> data = pd.DataFrame({'a': [-0.1, 1.2], 'b': [0.5, 0.6]}) >>> Clip().transform(data) pd.DataFrame({'a': [0, 1], 'b': [0.5, 0.6]}) """ def __init__( self, *, columns: T.Optional[T.List[str]] = None, lower: T.Optional[float] = 0.0, upper: T.Optional[float] = 1.0, ): self._columns = columns self._lower = lower self._upper = upper def fit(self, X: pd.DataFrame, y=None): self.columns_ = ( self._columns if self._columns is not None else X.columns.to_list() ) return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Clips columns to a lower and upper threshold. Args: X (pd.DataFrame): Dataframe. Returns: pd.DataFrame: Returns the new dataframe. """ check_is_fitted(self, ['columns_']) X = X.copy() # check if columns in dataframe if len(diff := set(self.columns_) - set(X.columns)): raise ValueError( f'Columns {list(diff)} not found in DataFrame with columns ' f'{X.columns.to_list()}.' ) X[self.columns_] = X[self.columns_].clip( lower=self._lower, upper=self._upper, axis=0 ) return X
[docs] class ColumnTSMapper(BaseEstimator, TransformerMixin): def __init__( self, cols: T.List[str], timedelta: pd.Timedelta = pd.Timedelta(250, 'ms'), classes: T.Optional[T.List[str]] = None, verbose: bool = False, ): """Creates ColumnTSMapper. Expects the timestamp column to be of type pd.Timestamp. Args: cols (T.List[str]): names of [0] timestamp column, [1] sensor names, [2] sensor values. timedelta (pd.Timedelta): Timedelta to resample with. classes (T.List[str]): T.List of sensor names. verbose (bool, optional): Whether to allow prints. """ super().__init__() self._cols = cols self._timedelta = timedelta self._verbose = verbose if classes is not None: self.classes_ = classes
[docs] def fit(self, X, y=None): """Gets the unique values in the sensor name column that are needed to expand the dataframe. Args: X (pd.DataFrame): Dataframe. y (array-like, optional): Labels. Defaults to None. Returns: ColumnTSMapper: Returns this. """ classes = X[self._cols[1]].unique() self.classes_ = np.hstack(['Timestamp', classes]) return self
[docs] def transform(self, X): """Performs the mapping to equidistant timestamps. Args: X (pd.DataFrame): Dataframe. Raises: ValueError: Raised if column is not found in `X`. Returns: pd.DataFrame: Returns the remapped dataframe. """ # check is fit had been called check_is_fitted(self) # check if all columns exist if not all([item in X.columns for item in self._cols]): raise ValueError( f'Columns {self._cols} not found in DataFrame ' f'{X.columns.to_list()}.' ) # split sensors into individual columns # create new dataframe with all _categories # use timestamp index, to use resample later on # initialized with na sensors = pd.DataFrame( None, columns=self.classes_, index=X[self._cols[0]] ) # group by sensor groups = X.groupby([self._cols[1]]) # write sensor values to sensors which is indexed by the timestamp for g in groups: sensors.loc[g[1][self._cols[0]], g[0]] = g[1][ self._cols[2] ].to_numpy() sensors = sensors.apply(pd.to_numeric, errors='ignore') # fill na, important before resampling # otherwise mean affects more samples than necessary # first: forward fill to next valid observation # second: backward fill first missing rows sensors = sensors.fillna(method='ffill').fillna(method='bfill') # resamples to equidistant timeframe # take avg if multiple samples in the same timeframe sensors = sensors.resample(self._timedelta).mean() sensors = sensors.fillna(method='ffill').fillna(method='bfill') # FIXME: to avoid nans in model, but needs better fix sensors = sensors.fillna(value=0.0) # move index to column and use rangeindex sensors['Timestamp'] = sensors.index sensors.index = pd.RangeIndex(stop=sensors.shape[0]) if self._verbose: start, end = sensors.iloc[0, 0], sensors.iloc[-1, 0] print('ColumnTSMapper: ') print( f'{sensors.shape[0]} rows. ' f'Mapped to {self._timedelta.total_seconds()}s interval ' f'from {start} to {end}.' ) return sensors
[docs] class DatetimeTransformer(BaseEstimator, TransformerMixin): """Transforms a list of columns to ``datetime``. Args: columns (list): List of columns names. dt_format (str, optional): Optional format string. Example: >>> data = pd.DataFrame({'dt': ['2021-07-02 16:30:00']}) >>> data = DatetimeTransformer(columns=['dt']).transform(data) >>> data.dtypes dt datetime64[ns] """ def __init__( self, *, columns: T.List[str], dt_format: T.Optional[str] = None, ): super().__init__() self._columns = columns self._format = dt_format def fit(self, X, y=None): return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Parses ``columns`` to datetime. Args: X (pd.DataFrame): Dataframe. Raises: ValueError: Raised if columns are missing in `X`. Returns: pd.DataFrame: Returns the dataframe with datetime columns. """ X = X.copy() # check if columns in dataframe if len(diff := set(self._columns) - set(X.columns)): raise ValueError( f'Columns {list(diff)} not found in DataFrame with columns ' f'{X.columns.to_list()}.' ) # parse to pd.Timestamp X[self._columns] = X[self._columns].apply( lambda x: pd.to_datetime(x, format=self._format), axis=1 ) return X
[docs] class NumericTransformer(BaseEstimator, TransformerMixin): """Transforms ``columns`` to numeric datatypes with ``pd.to_numeric``. columns (list, optional): List of columns names. If None, then all columns from the fitted dataframe are transformed. Example: >>> data = pd.DataFrame({'a': [0], 'b': ['1']}) >>> data.dtypes a int64 b object >>> data = NumericTransformer().transform(data) >>> data.dtypes a int64 b int64 """ def __init__(self, *, columns: T.Optional[T.List[str]] = None): super().__init__() self._columns = columns def fit(self, X, y=None): self.columns_ = ( self._columns if self._columns is not None else X.columns ) return self
[docs] def transform(self, X): """Parses `columns` to numeric. Args: X (pd.DataFrame): Dataframe. Raises: ValueError: Raised if columns are missing in `X`. Returns: pd.DataFrame: Returns the dataframe with datetime columns. """ check_is_fitted(self, ['columns_']) X = X.copy() # check if columns in dataframe if len(diff := set(self.columns_) - set(X.columns)): raise ValueError( f'Columns {list(diff)} not found in DataFrame with columns ' f'{X.columns.to_list()}.' ) # parse to numeric X[self.columns_] = X[self.columns_].apply(pd.to_numeric, axis=1) return X
[docs] class TimeframeExtractor(BaseEstimator, TransformerMixin): """Drops samples that are not between a given ``start_time`` and ``end_time``. Limits are inclusive. Args: time_column (str): Column name of the datetime column. start_time (str, datetime.time): Start time. Can be parsed from a str. end_time (str, datetime.time): End time. Can be parsed from a str. invert(bool): Whether to invert the range. If True, then rows between ``start_time`` and ``end_time`` are removed. verbose (bool, optional): Whether to be verbose. Example: >>> data = pd.DataFrame( {'dates': [datetime.datetime(2021, 7, 2, 9, 50, 0), datetime.datetime(2021, 7, 2, 11, 0, 0), datetime.datetime(2021, 7, 2, 12, 10, 0)], 'values': [0, 1, 2]}) >>> TimeframeExtractor(time_column='dates', start_time= datetime.time(10, 0, 0), end_time=datetime.time(12, 0, 0) ).transform(data) pd.DataFrame({'dates': datetime.datetime(2021, 7, 2, 11, 0, 0), 'values': [1]}) """ def __init__( self, *, time_column: str, start_time: T.Union[str, datetime.time], end_time: T.Union[str, datetime.time], invert: bool = False, verbose: bool = False, ): super().__init__() if isinstance(start_time, str): start_time = pd.to_datetime(start_time).time() if isinstance(end_time, str): end_time = pd.to_datetime(end_time).time() self._start = start_time self._end = end_time self._column = time_column self._negate = invert self._verbose = verbose def fit(self, X: pd.DataFrame, y=None): return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Drops rows from the dataframe if they are not in between `start_time` and `end_time`. Limits are inclusive. Reindexes the dataframe. Args: X (pd.DataFrame): Dataframe. Returns: pd.DataFrame: Returns the new dataframe. """ X = X.copy() rows_before = X.shape[0] dates = pd.to_datetime(X[self._column]) if self._negate: X = X.loc[ ~( (dates.dt.time >= self._start) & (dates.dt.time <= self._end) ), :, ] else: X = X.loc[ (dates.dt.time >= self._start) & (dates.dt.time <= self._end), :, ] X.index = pd.RangeIndex(0, X.shape[0]) rows_after = X.shape[0] if self._verbose: print( 'TimeframeExtractor: \n' f'{rows_after} rows. Dropped {rows_before - rows_after} ' f'rows which are {"in" if self._negate else "not in"} between ' f'{self._start} and {self._end}.' ) return X
[docs] class DateExtractor(BaseEstimator, TransformerMixin): """Drops rows that are not between a start and end date. Limits are inclusive. Args: date_column (str): Column name of the datetime column. start_date (str, datetime.date): Start date. Can be parsed from a str. end_date (str, datetime.date): End date. Can be parsed from a str. invert(bool): Whether to invert the range. If True, then rows between ``start_date`` and ``end_date`` are removed. verbose (bool, optional): Whether to be verbose. Example: >>> data = pd.DataFrame( {'dates': [datetime.datetime(2021, 7, 1, 9, 50, 0), datetime.datetime(2021, 7, 2, 11, 0, 0), datetime.datetime(2021, 7, 3, 12, 10, 0)], 'values': [0, 1, 2]}) >>> DateExtractor(date_column='dates', start_date=datetime.date(2021, 7, 2), end_date=datetime.date(2021, 7, 2)).transform(data) pd.DataFrame({'dates': datetime.datetime(2021, 7, 2, 11, 0, 0), 'values': [1]}) """ def __init__( self, *, date_column: str, start_date: T.Union[str, datetime.date], end_date: T.Union[str, datetime.date], invert: bool = False, verbose: bool = False, ): """Initializes `DateExtractor`. Args: date_column (str): Name of timestamp column. start_date (datetime.date): Start date. end_date (datetime.date): End date. invert (bool): Whether to invert the range. verbose (bool, optional): Whether to allow prints. """ super().__init__() if isinstance(start_date, str): start_date = pd.to_datetime(start_date).date() if isinstance(end_date, str): end_date = pd.to_datetime(end_date).date() self._start = start_date self._end = end_date self._column = date_column self._negate = invert self._verbose = verbose def fit(self, X: pd.DataFrame, y=None): return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Drops rows which date is not between `start` and end date. Bounds are inclusive. Dataframe is reindexed. Args: X (pd.Dataframe): Dataframe. Returns: pd.Dataframe: Returns the new dataframe. """ rows_before = X.shape[0] dates = pd.to_datetime(X[self._column]) if self._negate: X = X.loc[ ~( (dates.dt.date >= self._start) & (dates.dt.date <= self._end) ), :, ] else: X = X.loc[ (dates.dt.date >= self._start) & (dates.dt.date <= self._end), :, ] X.index = pd.RangeIndex(0, X.shape[0]) rows_after = X.shape[0] if self._verbose: print( 'DateExtractor: \n' f'{rows_after} rows. Dropped {rows_before - rows_after} rows ' f'which are {"in" if self._negate else "not in"} between ' f'{self._start} and {self._end}.' ) return X
[docs] class ValueMapper(BaseEstimator, TransformerMixin): """Maps values in ``columns`` according to ``classes``. Wrapper for pd.DataFrame.replace. Args: columns (T.List[str]): Names of columns to remap. classes (T.Dict): Dictionary of old and new value. verbose (bool, optional): Whether to allow prints. Example: >>> data = pd.DataFrame({'a': [0.0, 1.0, 2.0]}) >>> ValueMapper(columns=['a'], classes={2.0: 1.0}).transform(data) pd.DataFrame({'a': [0.0, 1.0, 1.0]}) """ def __init__( self, *, columns: T.List[str], classes: T.Dict, verbose: bool = False ): super().__init__() self._columns = columns self._classes = classes self._verbose = verbose def fit(self, X: pd.DataFrame, y=None): return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Remaps values in `column` according to `classes`. Gives UserWarning if unmapped values are found. Args: X (pd.DataFrame): Dataframe. Returns: pd.DataFrame: Returns the new dataframe with remapped values. """ X = X.copy() # warning if unmapped values values = pd.unique(X[self._columns].values.ravel('K')) if not set(self._classes.keys()).issuperset(values): warnings.warn( f'Classes {set(self._classes.keys()) - set(values)} ignored.' ) X[self._columns] = X[self._columns].replace(self._classes) return X
[docs] class Sorter(BaseEstimator, TransformerMixin): """Sorts the dataframe by a list of columns. Wrapper for pd.DataFrame.sort_values. Args: columns (T.List[str]): List of column names to sort by. ascending (bool): Whether to sort ascending. Defaults to True. axis (int): Axis to sort by. Example: >>> data = pd.DataFrame({'a': [0, 1], 'b': [1, 0]}) >>> Sorter(columns=['b'], ascending=True).transform(data) pd.DataFrame({'a': [1, 0], 'b': [0, 1]}) """ def __init__( self, *, columns: T.Sequence[str], ascending: bool = True, axis: int = 0, ): super().__init__() self._columns = columns self._ascending = ascending self._axis = axis def fit(self, X: pd.DataFrame, y=None): return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Sorts ``X`` by ``columns``. Args: X (pd.DataFrame): Dataframe. Returns: pd.DataFrame: Returns the sorted Dataframe. """ X = X.copy() return X.sort_values( by=self._columns, ascending=self._ascending, axis=self._axis ) # type: ignore
[docs] class Fill(BaseEstimator, TransformerMixin): """Fills NA values with a constant or 'bfill' / 'ffill'. Wrapper for df.fillna. Args: value (T.Any): Constant to fill NAs. Defaults to None. method (str | None): method: 'ffill' or 'bfill'. Defaults to None. Example: >>> data = pd.DataFrame({'a': [0.0, np.nan]}) >>> Fill(value=1.0).transform(data) pd.DataFrame({'a': [0.0, 1.0]}) """ def __init__( self, *, value: T.Optional[T.Any] = None, method: T.Optional[T.Literal['ffill', 'bfill']] = None, ): super().__init__() self._value = value if method not in [None, 'ffill', 'bfill']: raise ValueError(f'Invalid method "{method}".') self._method = method def fit(self, X: pd.DataFrame, y=None): return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Fills NAs. Args: X (pd.DataFrame): Dataframe. Returns: pd.DataFrame: Returns the filled dataframe. """ X = X.copy() methods: T.Dict[T.Optional[str], T.Callable[..., pd.DataFrame]] = { 'ffill': X.ffill, 'bfill': X.bfill, None: partial(X.fillna, value=self._value), # type: ignore } return methods[self._method]()
[docs] class TimeOffsetTransformer(BaseEstimator, TransformerMixin): """Transformer that offsets a datetimes in ``time_colum`` by a given ``timedelta``. Args: time_column (T.List[str]): List of names of columns with timestamps to offset. timedelta (pd.Timedelta): Offset. Example: >>> data = pd.DataFrame( {'dates': [datetime.datetime(2021, 7, 1, 16, 0, 0)]}) >>> TimeOffsetTransformer(time_columns=['dates'], timedelta=pd.Timedelta(1, 'h') ).transform(data) pd.DataFrame({'dates': datetime.datetime(2021, 07, 2, 17, 0, 0)}) """ def __init__(self, *, time_columns: T.List[str], timedelta: pd.Timedelta): super().__init__() self._time_columns = time_columns self._timedelta = timedelta def fit(self, X: pd.DataFrame, y=None): return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Offsets the timestamps in ``time_columns`` by ``timedelta``. Args: X (pd.DataFrame): Dataframe. Returns: pd.DataFrame: Returns the dataframe. """ X = X.copy() for column in self._time_columns: X[column] = pd.to_datetime(X[column]) + self._timedelta return X
[docs] class ConditionedDropper(BaseEstimator, TransformerMixin): """Module to drop rows in ``column`` that contain numeric values and are above ``threshold``. If ``inverted`` is True, values below ``threshold`` are dropped. Args: column (str): Column to match condition in. threshold (float): Threshold. inverted (bool, optional): If false, all values below ``threshold`` are dropped, otherwise all values above are dropped. Example: >>> data = pd.DataFrame({'a': [0.0, 1.2, 0.5]}) >>> ConditionedDropper(column='a', threshold=0.5).transform(data) pd.DataFrame({'a': [0.0, 0.5]}) """ def __init__(self, *, column: str, threshold: float, invert: bool = False): super().__init__() self.column = column self.threshold = threshold self.inverted = invert def fit(self, X: pd.DataFrame, y=None): return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Drops rows if below or above a threshold. Args: X (pd.DataFrame): Dataframe. Returns: pd.DataFrame: Returns the dataframe. """ X = X.copy() if not self.inverted: X = X.drop(X[X[self.column] > self.threshold].index) else: X = X.drop(X[X[self.column] < self.threshold].index) return X.reset_index(drop=True)
[docs] class ZeroVarianceDropper(BaseEstimator, TransformerMixin): """Removes all columns that are numeric and have zero variance. Gives a warning if a column that was registered as zero variance deviates during transform. Args: verbose (bool, optional): Whether to print status messages. Example: >>> data = pd.DataFrame({'a': [0.0, 0.0], 'b': [1.0, 0.0]}) >>> ZeroVarianceDropper().fit_transform(data) pd.DataFrame({'b': [1.0, 0.0]}) """ def __init__(self, verbose: bool = False): super().__init__() self._verbose = verbose def _get_zero_variance_columns(self, X: pd.DataFrame) -> T.List[str]: """Finds all columns with zero variance. Args: X (pd.DataFrame): Dataframe. Returns: T.List[str]: Returns a list of column names. """ var = X.var() # get columns with zero variance return [str(k) for k, v in var.items() if v == 0.0]
[docs] def fit(self, X: pd.DataFrame, y=None): """Finds all columns with zero variance. Args: X (pd.DataFrame): Dataframe. y (array-like, optional): Labels. Defaults to None. Returns: ZeroVarianceDropper: Returns self. """ self.columns_ = self._get_zero_variance_columns(X) if self._verbose: print( f'Found {len(self.columns_)} columns with 0 variance ' f'({self.columns_}).' ) return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Drops all columns found by fit with zero variance. Args: X (pd.DataFrame): Dataframe. Returns: pd.DataFrame: Returns the new dataframe. """ check_is_fitted(self, 'columns_') X = X.copy() # check if columns match columns = self._get_zero_variance_columns(X) disj = {*columns} ^ {*self.columns_} if len(disj) > 0: warnings.warn(f'Found column with higher variance: {disj}.') before = X.shape[-1] X = X.drop(self.columns_, axis=1) if self._verbose: after = X.shape[-1] print(f'Dropped {before - after} columns.') return X
[docs] class SignalSorter(BaseEstimator, TransformerMixin): """Sorts the signals into continuous and binary signals. First the continuous, then the binary signals. Args: verbose (bool, optional): Whether to print status. Example: >>> data = pd.DataFrame({'a': [0.0, 1.0], 'b': [0.0, 0.2]}) >>> SignalSorter().fit_transform(data) pd.DataFrame({'b': [1.0, 0.0], 'a': [0.0, 1.0]}) """ def __init__(self, verbose: bool = False): super().__init__() self.verbose = verbose def fit(self, X, y=None): # find signals that are binary uniques = {col: self._is_binary(X[col]) for col in X.columns} self.order_ = sorted(uniques.items(), key=lambda v: v[1]) if self.verbose: print(f'Binary: {self.order_}') return self def _is_binary(self, X: pd.Series) -> bool: """ Args: X (pd.Series): Column of a data frame. Returns: bool: Whether ``X`` is a binary series. """ unique = X.unique() if len(unique) > 2: return False if len(unique) == 1: return True try: if set(unique.astype('float')) != {1.0, 0.0}: return False return True except Exception: return False
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Sorts ``X`` into to a block of continuous and binary signals. Args: X (pd.DataFrame): Dataframe. Returns: pd.DataFrame: Returns the sorted dataframe. """ check_is_fitted(self, []) X = X.copy() return X[[c[0] for c in self.order_]]
[docs] class ColumnSorter(BaseEstimator, TransformerMixin): """Sorts the dataframe in the same order as the fitted dataframe. Attributes: raise_on_error (bool): Whether to raise an exception if additional columns that were not fitted are found. verbose (bool): Whether to print the status. Example: >>> data = pd.DataFrame({'a': [0.0, 1.0], 'b': [0.0, 0.2]}) >>> (sorter := ColumnSorter()).fit(data) >>> sorter.transform(pd.DataFrame({'b': [0.2, 1.0], 'a': [0.0, 0.1]})) pd.DataFrame({'a': [0.0, 0.1], 'b': [0.2, 1.0]}) """ def __init__(self, *, raise_on_error: bool = True, verbose: bool = False): super().__init__() self.raise_on_error = raise_on_error self.verbose = verbose def fit(self, X, y=None): self.columns_ = X.columns.to_numpy() if self.verbose: print(f'Sorting in order {self.columns_}.') return self
[docs] def transform(self, X): """Sorts ``X`` by ``columns``. Args: X (pd.DataFrame): Dataframe. Returns: pd.DataFrame: Returns the sorted Dataframe. """ check_is_fitted(self) if len(diff := list(set(self.columns_) - set(X.columns))): raise ValueError(f'Columns missing: {diff}.') if len(diff := list(set(X.columns) - set(self.columns_))): if self.raise_on_error: raise ValueError(f'Found additional columns: {diff}.') else: warnings.warn(f'Found additional columns: {diff}.') return X.loc[:, self.columns_]
[docs] class DifferentialCreator(BaseEstimator, TransformerMixin): """Calculates signal differences between subsequent time points. Concatenates the new information with the dataframe and adds "_dif" as suffix to the created columns. Args: keys: T.List[str]: Columns to create derivatives from. Example: >>> data = pd.DataFrame({'a': [1.0, 2.0, 1.0]}) >>> dcreator = DifferentialCreator(columns=['a']) >>> dcreator.transform(pd.DataFrame(data) pd.DataFrame({'a': [1.0, 2.0, 1.0], 'a_dif': [1.0, -1.0, 0.0]}) """ def __init__(self, *, columns: T.List[str]): super().__init__() self._columns = columns def fit(self, X: pd.DataFrame, y=None): return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Calculate differences between subsequent points. Fill NaN with zero. Args: X (pd.DataFrame): Dataframe. Returns: pd.DataFrame: Returns the concatenated DataFrame. """ X_dif = X[self._columns].diff(axis=0).fillna(0).add_suffix('_dif') return pd.concat([X, X_dif], axis=1)
[docs] class ClippingMinMaxScaler( OneToOneFeatureMixin, BaseEstimator, TransformerMixin ): """Normalizes the fitted data to the interval ``feature_range``. The parameter ``p`` can be used to calculate the ``max`` value as the ``p``-th percentile of the fitted data, i.e., ``p``% of the data is below. Data which exceeds the limits of ``feature_range`` after the scaling can be clipped to specific values via a ``clip`` range. Args: feature_range (T.Tuple[float, float]): New feature min and max. Defaults to (0., 1.). clip (T.Tuple[float, float]): Range to clip values. Defaults to None. p (float): Percentile of data that is used as data maximum. Defaults to 100. copy (bool, optional): Whether to create a copy. Defaults to True. Example: >>> data = pd.DataFrame({'a': [1, 2, 3, 4]}) >>> scaler = mlnext.ClippingMinMaxScaler( ... feature_range=(0, 0.5), ... clip=(0, 1)) >>> scaler.fit_transform(df) a 0 0.000000 1 0.166667 2 0.333333 3 0.500000 >>> df2 = pd.DataFrame({'a': [1, 4, 6, 8, 10]}) a 0 0.000000 1 0.500000 2 0.833333 3 1.000000 4 1.000000 """ _parameter_constraints: T.Dict[str, list] = { 'feature_range': [tuple, list], 'clip': [None, tuple, list], 'p': [int, float], } def __init__( self, feature_range: T.Tuple[float, float] = (0, 1), *, clip: T.Optional[T.Tuple[float, float]] = None, p: float = 100.0, ): self.feature_range = feature_range self.clip = clip self.p = p
[docs] def fit(self, X, y=None): """Fits the scaler to the data. Args: X (np.array): Data. y ([type], optional): Unused. Returns: MinMaxScaler: Returns self. """ # FIXME: hack to preserve output format # update if output_format can be preserved through sklearn if isinstance(X, pd.DataFrame): self.set_output(transform='pandas') self._validate_params() X = self._validate_data(X, dtype=FLOAT_DTYPES, reset=True) self.data_min_ = np.min(X, axis=0) self.data_max_ = np.percentile(X, self.p, axis=0) self.data_range_ = self.data_max_ - self.data_min_ f_range = self.feature_range self.scale_ = (f_range[1] - f_range[0]) / self.data_range_ self.min_ = f_range[0] - self.data_min_ * self.scale_ return self
[docs] def transform(self, X) -> np.ndarray: """Transforms ``X`` to the new feature range. Args: X (np.array): Data. Returns: np.array: Returns the scaled ``X``. """ check_is_fitted(self) X = self._validate_data(X, copy=True, dtype=FLOAT_DTYPES, reset=False) X *= self.scale_ X += self.min_ if self.clip is not None: X = np.clip(X, self.clip[0], self.clip[1]) return X
[docs] class FeatureCreator(BaseEstimator, TransformerMixin): """Creates new features from existing or calculated features. .. versionadded:: 0.6.0 Arguments: features (list(dict[str, Any] | NewFeatureModel)): List of new features. Expects the dict to match :class:`NewFeatureModel`. Example: >>> import pandas as pd >>> from mlnext import FeatureCreator >>> df = pd.DataFrame( ... { ... "height": [1, 2, 3], ... "width": [3, 2, 1], ... "a": [True, False, True], ... "b": [True, True, False], ... } ... ) >>> t = FeatureCreator( ... features=[ ... { ... "name": "area", ... "features": ["height", "width"], ... "op": "mul", ... }, ... { ... "name": "AandB", ... "features": ["a", "b"], ... "op": "and", ... }, ... { ... "name": "sum", ... "features": ["height", "width"], ... "op": "add", ... "keep": False, ... }, ... { ... "name": "area-sum", ... "features": ["area", "sum"], ... "op": "sub", ... }, ... ] ... ) >>> t.transform(df) height width a b area AandB area-sum 1 3 True True 3 True -1 2 2 False True 4 False 0 3 1 True False 3 False -1 """ def __init__( self, features: T.List[T.Union[T.Dict[str, T.Any], NewFeatureModel]], ): super().__init__() self.features = list(self._parse_inputs(features)) def _parse_inputs( self, features: T.List[T.Union[T.Dict[str, T.Any], NewFeatureModel]], ) -> T.Iterator[NewFeatureModel]: """Parses features to the correct datatype. Args: features (list[dict[str, Any] | NewFeatureModel]): Features. Raises: ValueError: Raised if a datatype is not accepted. Yields: Iterator[NewFeatureModel]: Returns the feature model. """ if not isinstance(features, (list, set)): raise ValueError( 'Expected features to be of type list or set. ' f'Got: {type(features)}.' ) for idx, feature in enumerate(features): if isinstance(feature, NewFeatureModel): yield feature if not isinstance(feature, dict): raise ValueError( f'Expected feature at index {idx} to be either a dict ' f'or NewFeatureModel. Got: {type(feature)}.' ) yield NewFeatureModel(**feature) def fit(self, X, y=None): return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Calculates new featrues based on the given description. Args: X (pd.DataFrame): Input data. Raises: ValueError: Raised if a feature is missing. Returns: pd.DataFrame: Returns the updated dataframe. """ X = X.copy() for feature in self.features: X[feature.name] = feature.calculate(X) drop_features = [ feature.name for feature in self.features if not feature.keep ] X = X.drop(drop_features, axis=1) return X
[docs] class LengthTransformer(BaseEstimator, TransformerMixin): """Pad or truncates the input to an fixed length by either a set length or a fitted length. Args: pad_length (int | None): Length to pad the data to. Default: None. fill_value (int): Value to pad data with. Default: 0. truncate (bool): Whether to truncate if the length exceeds pad_length. If False, an error is raised for an input longer than pad_length. .. versionadded:: 0.6.0 Example: >>> import pandas as pd >>> from mlnext import LengthTransformer >>> df = pd.DataFrame({'a': [0, 1, 2], 'b': [1, 2, 3]}) >>> t = LengthTransformer(pad_length=5, fill_value=-1) >>> t.fit_transform(df) a b 0 0 1 1 1 2 2 2 3 3 -1 -1 4 -1 -1 """ _parameter_constraints: T.Dict[str, list] = { 'pad_length': [int, None], 'fill_value': [int], } def __init__( self, pad_length: T.Union[int, None] = None, fill_value: int = 0, truncate: bool = False, ) -> None: super().__init__() self.pad_length = pad_length self.fill_value = fill_value self.truncate = truncate
[docs] def fit(self, X: pd.DataFrame, y=None): """Sets the pad_length to the length of the fitted dataframe (if pad_length is not defined). Args: X (pd.DataFrame): Data. y (_type_, optional): Labels (ignored). Defaults to None. Returns: LengthTransformer: Returns self. """ if self.pad_length is None: self.pad_length_ = X.shape[0] else: self.pad_length_ = self.pad_length return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Pads or truncates `X` to `pad_length_`. Args: X (pd.DataFrame): Data. Raises: ValueError: Raised if X is longer than pad_length and truncate is False. Returns: pd.DataFrame: Returns the new dataframe of length pad_length. """ check_is_fitted(self, ['pad_length_']) if X.shape[0] > self.pad_length_ and not self.truncate: raise ValueError( f'Input sequence ({X.shape[0]}) is longer than the one found ' f'when fit or set ({self.pad_length_}). To avoid this problem ' 'set truncate to True.' ) X = X.reindex( range(self.pad_length_), fill_value=self.fill_value, axis=0, copy=True, ) return X
[docs] class RelativeTimeEncoder(BaseEstimator, TransformerMixin): """Calculates the relative time based on a ``timestamp_column``. Args: timestamp_column (str): Name of the timestamp column. inplace (bool): Whether to perform the operation inplace and replace the timestamp_column with the relative time. output_name (str): Name of the output column. Inplace must be set to False. If inplace is False and output_name is None, then the new column is the timestamp column with _relative as a suffix. offset (int): Offset added to the relative time. unit (int): Unit of the time difference. .. versionadded:: 0.6.1 Example: >>> import pandas as pd >>> from mlnext import RelativeTimeEncoder >>> data = pd.DataFrame({'time': pd.date_range('')}) >>> encoder = pipeline.RelativeTimeEncoder( >>> timestamp_column='time', >>> inplace=False, >>> output_name='time_r', >>> offset=offset, >>> unit=unit, >>> ) >>> data = pd.DataFrame( >>> { >>> "time": pd.date_range( >>> "2024-10-01 10:00:00", >>> freq=f"2ms", >>> periods=5, >>> ) >>> } >>> ) >>> encoder.fit_transform(data) time time_r 0 2024-10-01 10:00:00.000 0.100 1 2024-10-01 10:00:00.002 0.102 2 2024-10-01 10:00:00.004 0.104 3 2024-10-01 10:00:00.006 0.106 4 2024-10-01 10:00:00.008 0.108 """ _unit_convert = { 'd': 60 * 60 * 24, 'h': 60 * 60, 'min': 60, 's': 1, 'ms': 1 / 1000, } def __init__( self, timestamp_column: str, inplace: bool = True, output_name: T.Optional[str] = None, offset: int = 0, unit: T.Literal['d', 'h', 'min', 's', 'ms'] = 'ms', ): super().__init__() self.timestamp_column = timestamp_column self.output_name = ( timestamp_column if inplace else ( output_name if output_name is not None else f'{timestamp_column}_relative' ) ) self.offset = offset self.unit = unit def fit(self, X: pd.DataFrame, y=None): return self
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Calculates the relative time for a timestamp column. Args: X (pd.DataFrame): Input. Raises: ValueError: Raised if the timestamp column was not found. Returns: pd.DataFrame: Returns the new dataframe. """ if self.timestamp_column not in X.columns: raise ValueError( f'Timestamp column "{self.timestamp_column}" not found in ' f'input. Available columns: {list(X.columns)}.' ) X = X.copy() column = pd.to_datetime(X.loc[:, self.timestamp_column]) relative_time = (column - column[0]).dt.total_seconds().to_numpy() X[self.output_name] = (relative_time) / self._unit_convert[ self.unit ] + self.offset return X