Source code for mlnext.pipeline
""" Module for data preprocessing.
"""
import datetime
import typing as T
import warnings
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator
from sklearn.base import OneToOneFeatureMixin
from sklearn.base import TransformerMixin
from sklearn.utils.validation import check_is_fitted
from sklearn.utils.validation import FLOAT_DTYPES
__all__ = [
'ColumnSelector',
'ColumnDropper',
'ColumnRename',
'NaDropper',
'Clip',
'DatetimeTransformer',
'NumericTransformer',
'TimeframeExtractor',
'DateExtractor',
'ValueMapper',
'Sorter',
'Fill',
'TimeOffsetTransformer',
'ConditionedDropper',
'ZeroVarianceDropper',
'SignalSorter',
'ColumnSorter',
'DifferentialCreator',
'ClippingMinMaxScaler'
]
[docs]class ColumnSelector(BaseEstimator, TransformerMixin):
"""Transformer to select a list of columns by their name.
Example:
>>> data = pd.DataFrame({'a': [0], 'b': [0]})
>>> ColumnSelector(keys=['a']).transform(data)
pd.DataFrame({'a': [0]})
"""
def __init__(self, keys: T.List[str]):
"""Creates ColumnSelector.
Transformer to select a list of columns for further processing.
Args:
keys (T.List[str]): T.List of columns to extract.
"""
self._keys = keys
def fit(self, X, y=None):
return self
[docs] def transform(self, X):
"""Extracts the columns from `X`.
Args:
X (pd.DataFrame): Dataframe.
Returns:
pd.DataFrame: Returns a DataFrame only containing the selected
features.
"""
return X.loc[:, self._keys]
[docs]class ColumnDropper(BaseEstimator, TransformerMixin):
"""Transformer to drop a list of columns by their name.
Example:
>>> data = pd.DataFrame({'a': [0], 'b': [0]})
>>> ColumnDropper(columns=['b']).transform(data)
pd.DataFrame({'a': [0]})
"""
def __init__(
self,
*,
columns: T.Union[T.List[str], T.Set[str]],
verbose: bool = False
):
"""Creates ColumnDropper.
Transformer to drop a list of columns from the data frame.
Args:
keys (list): T.List of columns names to drop.
"""
self.columns = set(columns)
self.verbose = verbose
def fit(self, X, y=None):
return self
[docs] def transform(self, X):
"""Drops a list of columns of `X`.
Args:
X (pd.DataFrame): Dataframe.
Returns:
pd.DataFrame: Returns the dataframe without the dropped features.
"""
cols = set(X.columns.to_list())
if len(m := self.columns - cols) > 0:
warnings.warn(f'Columns {m} not found in dataframe.')
if self.verbose:
print(f'New columns: {cols - self.columns}. '
f'Removed: {self.columns}.')
return X.drop(self.columns, axis=1, errors='ignore')
[docs]class ColumnRename(BaseEstimator, TransformerMixin):
"""Transformer to rename column with a function.
Example:
>>> data = pd.DataFrame({'a.b.c': [0], 'd.e.f': [0]})
>>> ColumnRename(lambda x: x.split('.')[-1]).transform(data)
pd.DataFrame({'c': [0], 'f': [0]})
"""
def __init__(self, mapper: T.Callable[[str], str]):
"""Create ColumnRename.
Transformer to rename columns by a mapper function.
Args:
mapper (lambda): Mapper rename function.
Example:
Given column with name: a.b.c
lambda x: x.split('.')[-1]
Returns c
"""
self.mapper = mapper
def fit(self, X, y=None):
return self
[docs] def transform(self, X):
"""Renames a columns in `X` with a mapper function.
Args:
X (pd.DataFrame): Dataframe.
Returns:
pd.DataFrame: Returns the dataframe with the renamed columns.
"""
# split the column name
# use the last item as new name
return X.rename(columns=self.mapper)
[docs]class NaDropper(BaseEstimator, TransformerMixin):
"""Transformer that drops rows with na values.
Example:
>>> data = pd.DataFrame({'a': [0, 1], 'b': [0, np.nan]})
>>> NaDropper().transform(data)
pd.DataFrame({'a': [0], 'b': [0]})
"""
def __init__(self):
pass
def fit(self, X, y=None):
return self
def transform(self, X):
return X.dropna()
[docs]class Clip(BaseEstimator, TransformerMixin):
"""Transformer that clips values by a lower and upper bound.
Example:
>>> data = pd.DataFrame({'a': [-0.1, 1.2], 'b': [0.5, 0.6]})
>>> Clip().transform(data)
pd.DataFrame({'a': [0, 1], 'b': [0.5, 0.6]})
"""
def __init__(self, lower: float = 0.0, upper: float = 1.0):
"""Creates Clip.
Transformer that clips a numeric column to the treshold if the
threshold is exceeded. Works with an upper and lower threshold. Wrapper
for pd.DataFrame.clip.
Args:
lower (float, optional): lower limit. Defaults to 0.
upper (float, optional): upper limit. Defaults to 1.
"""
self.upper = upper
self.lower = lower
def fit(self, X, y=None):
return self
def transform(self, X):
return X.clip(lower=self.lower, upper=self.upper, axis=0)
[docs]class ColumnTSMapper(BaseEstimator, TransformerMixin):
def __init__(
self,
cols: T.List[str],
timedelta: pd.Timedelta = pd.Timedelta(250, 'ms'),
classes: T.Optional[T.List[str]] = None,
verbose: bool = False
):
"""Creates ColumnTSMapper.
Expects the timestamp column to be of type pd.Timestamp.
Args:
cols (T.List[str]): names of [0] timestamp column,
[1] sensor names, [2] sensor values.
timedelta (pd.Timedelta): Timedelta to resample with.
classes (T.List[str]): T.List of sensor names.
verbose (bool, optional): Whether to allow prints.
"""
super().__init__()
self._cols = cols
self._timedelta = timedelta
self._verbose = verbose
if classes is not None:
self.classes_ = classes
[docs] def fit(self, X, y=None):
"""Gets the unique values in the sensor name column that
are needed to expand the dataframe.
Args:
X (pd.DataFrame): Dataframe.
y (array-like, optional): Labels. Defaults to None.
Returns:
ColumnTSMapper: Returns this.
"""
classes = X[self._cols[1]].unique()
self.classes_ = np.hstack(['Timestamp', classes])
return self
[docs] def transform(self, X):
"""Performs the mapping to equidistant timestamps.
Args:
X (pd.DataFrame): Dataframe.
Raises:
ValueError: Raised if column is not found in `X`.
Returns:
pd.DataFrame: Returns the remapped dataframe.
"""
# check is fit had been called
check_is_fitted(self)
# check if all columns exist
if not all([item in X.columns for item in self._cols]):
raise ValueError(
f'Columns {self._cols} not found in DataFrame '
f'{X.columns.to_list()}.')
# split sensors into individual columns
# create new dataframe with all _categories
# use timestamp index, to use resample later on
# initialized with na
sensors = pd.DataFrame(
None, columns=self.classes_, index=X[self._cols[0]])
# group by sensor
groups = X.groupby([self._cols[1]])
# write sensor values to sensors which is indexed by the timestamp
for g in groups:
sensors.loc[g[1][self._cols[0]], g[0]
] = g[1][self._cols[2]].to_numpy()
sensors = sensors.apply(pd.to_numeric, errors='ignore')
# fill na, important before resampling
# otherwise mean affects more samples than necessary
# first: forward fill to next valid observation
# second: backward fill first missing rows
sensors = sensors.fillna(method='ffill').fillna(method='bfill')
# resamples to equidistant timeframe
# take avg if multiple samples in the same timeframe
sensors = sensors.resample(self._timedelta).mean()
sensors = sensors.fillna(method='ffill').fillna(method='bfill')
# FIXME: to avoid nans in model, but needs better fix
sensors = sensors.fillna(value=0.0)
# move index to column and use rangeindex
sensors['Timestamp'] = sensors.index
sensors.index = pd.RangeIndex(stop=sensors.shape[0])
if self._verbose:
start, end = sensors.iloc[0, 0], sensors.iloc[-1, 0]
print('ColumnTSMapper: ')
print(f'{sensors.shape[0]} rows. '
f'Mapped to {self._timedelta.total_seconds()}s interval '
f'from {start} to {end}.')
return sensors
[docs]class DatetimeTransformer(BaseEstimator, TransformerMixin):
"""Transforms a list of columns to datetime.
Example:
>>> data = pd.DataFrame({'dt': ['2021-07-02 16:30:00']})
>>> data = DatetimeTransformer(columns=['dt']).transform(data)
>>> data.dtypes
dt datetime64[ns]
"""
def __init__(
self,
*,
columns: T.List[str],
dt_format: T.Optional[str] = None
):
"""Creates DatetimeTransformer.
Parses a list of column to pd.Timestamp.
Args:
columns (list): T.List of columns names.
dt_format (str): T.Optional format string.
"""
super().__init__()
self._columns = columns
self._format = dt_format
def fit(self, X, y=None):
return self
[docs] def transform(self, X):
"""Parses `columns` to datetime.
Args:
X (pd.DataFrame): Dataframe.
Raises:
ValueError: Raised if columns are missing in `X`.
Returns:
pd.DataFrame: Returns the dataframe with datetime columns.
"""
X = X.copy()
# check if columns in dataframe
if len(diff := set(self._columns) - set(X.columns)):
raise ValueError(
f'Columns {diff} not found in DataFrame with columns'
f'{X.columns.to_list()}.')
# parse to pd.Timestamp
X[self._columns] = X[self._columns].apply(
lambda x: pd.to_datetime(x, format=self._format), axis=0)
# column wise
return X
[docs]class NumericTransformer(BaseEstimator, TransformerMixin):
"""Transforms a list of columns to numeric datatype.
Example:
>>> data = pd.DataFrame({'a': [0], 'b': ['1']})
>>> data.dtypes
a int64
b object
>>> data = NumericTransformer().transform(data)
>>> data.dtypes
a int64
b int64
"""
def __init__(self, *, columns: T.Optional[T.List[str]] = None):
"""Creates NumericTransformer.
Parses a list of column to numeric datatype. If None, all are
attempted to be parsed.
Args:
columns (list): T.List of columns names.
dt_format (str): T.Optional format string.
"""
super().__init__()
self._columns = columns
def fit(self, X, y=None):
return self
[docs] def transform(self, X):
"""Parses `columns` to numeric.
Args:
X (pd.DataFrame): Dataframe.
Raises:
ValueError: Raised if columns are missing in `X`.
Returns:
pd.DataFrame: Returns the dataframe with datetime columns.
"""
X = X.copy()
# transform all columns
if self._columns is None:
columns = X.columns.to_list()
else:
columns = self._columns
if len((diff := list(set(columns) - set(cols := X.columns)))):
raise ValueError(f'Columns found: {cols.to_list()}. '
f'Columns missing: {diff}.')
# parse to numeric
# column wise
X[columns] = X[columns].apply(pd.to_numeric, axis=0)
return X
[docs]class TimeframeExtractor(BaseEstimator, TransformerMixin):
"""Drops sampes that are not between a given start and end time.
Limits are inclusive.
Example:
>>> data = pd.DataFrame(
{'dates': [datetime.datetime(2021, 7, 2, 9, 50, 0),
datetime.datetime(2021, 7, 2, 11, 0, 0),
datetime.datetime(2021, 7, 2, 12, 10, 0)],
'values': [0, 1, 2]})
>>> TimeframeExtractor(time_column='dates',
start_time= datetime.time(10, 0, 0),
end_time=datetime.time(12, 0, 0)
).transform(data)
pd.DataFrame({'dates': datetime.datetime(2021, 7, 2, 11, 0, 0),
'values': [1]})
"""
def __init__(
self,
*,
time_column: str,
start_time: datetime.time,
end_time: datetime.time,
invert: bool = False,
verbose: bool = False
):
"""Creates TimeframeExtractor.
Drops samples that are not in between `start_time` and `end_time` in
`time_column`.
Args:
time_column (str): Column name of the timestamp column.
start_time (datetime.time): Start time.
end_time (datetime.time): End time.
invert(bool): Whether to invert the range.
verbose (bool, optional): Whether to allow prints.
"""
super().__init__()
self._start = start_time
self._end = end_time
self._column = time_column
self._negate = invert
self._verbose = verbose
def fit(self, X, y=None):
return self
[docs] def transform(self, X):
"""Drops rows from the dataframe if they are not in between
`start_time` and `end_time`. Limits are inclusive. Reindexes the
dataframe.
Args:
X (pd.DataFrame): Dataframe.
Returns:
pd.DataFrame: Returns the new dataframe.
"""
X = X.copy()
rows_before = X.shape[0]
dates = pd.to_datetime(X[self._column])
if self._negate:
X = X.loc[~((dates.dt.time >= self._start) &
(dates.dt.time <= self._end)), :]
else:
X = X.loc[(dates.dt.time >= self._start) &
(dates.dt.time <= self._end), :]
X.index = pd.RangeIndex(0, X.shape[0])
rows_after = X.shape[0]
if self._verbose:
print(
'TimeframeExtractor: \n'
f'{rows_after} rows. Dropped {rows_before - rows_after} '
f'rows which are {"in" if self._negate else "not in"} between '
f'{self._start} and {self._end}.'
)
return X
[docs]class DateExtractor(BaseEstimator, TransformerMixin):
""" Drops rows that are not between a start and end date.
Limits are inclusive.
Example:
>>> data = pd.DataFrame(
{'dates': [datetime.datetime(2021, 7, 1, 9, 50, 0),
datetime.datetime(2021, 7, 2, 11, 0, 0),
datetime.datetime(2021, 7, 3, 12, 10, 0)],
'values': [0, 1, 2]})
>>> DateExtractor(date_column='dates',
start_date=datetime.date(2021, 7, 2),
end_date=datetime.date(2021, 7, 2)).transform(data)
pd.DataFrame({'dates': datetime.datetime(2021, 07, 2, 11, 0, 0),
'values': [1]})
"""
def __init__(
self,
*,
date_column: str,
start_date: datetime.date,
end_date: datetime.date,
invert: bool = False,
verbose: bool = False
):
"""Initializes `DateExtractor`.
Args:
date_column (str): Name of timestamp column.
start_date (datetime.date): Start date.
end_date (datetime.date): End date.
invert (bool): Whether to invert the range.
verbose (bool, optional): Whether to allow prints.
"""
super().__init__()
self._start = start_date
self._end = end_date
self._column = date_column
self._negate = invert
self._verbose = verbose
def fit(self, X, y=None):
return self
[docs] def transform(self, X):
"""Drops rows which date is not between `start` and end date.
Bounds are inclusive. Dataframe is reindexed.
Args:
X (pd.Dataframe): Dataframe.
Returns:
pd.Dataframe: Returns the new dataframe.
"""
rows_before = X.shape[0]
dates = pd.to_datetime(X[self._column])
if self._negate:
X = X.loc[~((dates.dt.date >= self._start) &
(dates.dt.date <= self._end)), :]
else:
X = X.loc[(dates.dt.date >= self._start) &
(dates.dt.date <= self._end), :]
X.index = pd.RangeIndex(0, X.shape[0])
rows_after = X.shape[0]
if self._verbose:
print(
'DateExtractor: \n'
f'{rows_after} rows. Dropped {rows_before - rows_after} rows '
f'which are {"in" if self._negate else "not in"} between '
f'{self._start} and {self._end}.'
)
return X
[docs]class ValueMapper(BaseEstimator, TransformerMixin):
"""Maps values in `column` according to `classes`. Wrapper for
pd.DataFrame.replace.
Example:
>>> data = pd.DataFrame({'a': [0.0, 1.0, 2.0]})
>>> ValueMapper(columns=['a'], classes={2.0: 1.0}).transform(data)
pd.DataFrame({'a': [0.0, 1.0, 1.0]})
"""
def __init__(
self,
*,
columns: T.List[str],
classes: T.Dict,
verbose: bool = False
):
"""Initialize `ValueMapper`.
Args:
columns (T.List[str]): Names of columns to remap.
classes (T.Dict): Dictionary of old and new value.
verbose (bool, optional): Whether to allow prints.
"""
super().__init__()
self._columns = columns
self._classes = classes
self._verbose = verbose
def fit(self, X, y=None):
return self
[docs] def transform(self, X):
"""Remaps values in `column` according to `classes`.
Gives UserWarning if unmapped values are found.
Args:
X (pd.DataFrame): Dataframe.
Returns:
pd.DataFrame: Returns the new dataframe with remapped values.
"""
X = X.copy()
# warning if unmapped values
values = pd.unique(X[self._columns].values.ravel('K'))
if not set(self._classes.keys()).issuperset(values):
warnings.warn(
f'Classes {set(self._classes.keys()) - set(values)} ignored.')
X[self._columns] = X[self._columns].replace(self._classes)
return X
[docs]class Sorter(BaseEstimator, TransformerMixin):
"""Sorts the dataframe by a list of columns. Wrapper for
pd.DataFrame.sort_values.
Example:
>>> data = pd.DataFrame({'a': [0, 1], 'b': [1, 0]})
>>> Sorter(columns=['b'], ascending=True).transform(data)
pd.DataFrame({'a': [1, 0], 'b': [0, 1]})
"""
def __init__(
self,
*,
columns: T.List[str],
ascending: bool = True,
axis: int = 0
):
"""Initialize `Sorter`.
Args:
columns (T.List[str]): T.List of column names to sort by.
ascending (bool): Whether to sort ascending.
axis (int): Axis to sort by.
"""
super().__init__()
self._columns = columns
self._ascending = ascending
self._axis = axis
def fit(self, X, y=None):
return self
[docs] def transform(self, X):
"""Sorts `X` by `columns`.
Args:
X (pd.DataFrame): Dataframe.
Returns:
pd.DataFrame: Returns the sorted Dataframe.
"""
X = X.copy()
return X.sort_values(by=self._columns,
ascending=self._ascending,
axis=self._axis)
[docs]class Fill(BaseEstimator, TransformerMixin):
"""Fills NA values with a constant or 'bfill' / 'ffill'.
Wrapper for df.fillna.
Example:
>>> data = pd.DataFrame({'a': [0.0, np.nan]})
>>> Fill(value=1.0).transform(data)
pd.DataFrame({'a': [0.0, 1.0]})
"""
def __init__(
self,
*,
value: T.Any,
method: T.Optional[str] = None
):
"""Initialize `Fill`.
Args:
value (T.Any): Constant to fill NAs.
method (str): method: 'ffill' or 'bfill'.
"""
super().__init__()
self._value = value
self._method = method
def fit(self, X, y=None):
return self
[docs] def transform(self, X):
"""Fills NAs.
Args:
X (pd.DataFrame): Dataframe.
Returns:
pd.DataFrame: Returns the filled dataframe.
"""
X = X.copy()
return X.fillna(self._value, method=self._method)
[docs]class TimeOffsetTransformer(BaseEstimator, TransformerMixin):
"""`TimeOffsetTransformer` offsets a datetime by `timedelta`.
Example:
>>> data = pd.DataFrame(
{'dates': [datetime.datetime(2021, 7, 1, 16, 0, 0)]})
>>> TimeOffsetTransformer(time_columns=['dates'],
timedelta=pd.Timedelta(1, 'h')
).transform(data)
pd.DataFrame({'dates': datetime.datetime(2021, 07, 2, 17, 0, 0)})
"""
def __init__(self, *, time_columns: T.List[str], timedelta: pd.Timedelta):
"""
Initialize `TimeOffsetTransformer`.
Args:
time_column (T.List[str]): T.List of names of columns with
timestamps
to offset.
timedelta (pd.Timedelta): Offset.
"""
super().__init__()
self._time_columns = time_columns
self._timedelta = timedelta
def fit(self, X, y=None):
return self
[docs] def transform(self, X):
"""Offsets the timestamps in `time_columns` by `timedelta`-
Args:
X (pd.DataFrame): Dataframe.
Returns:
pd.DataFrame: Returns the dataframe.
"""
X = X.copy()
for column in self._time_columns:
X[column] = pd.to_datetime(X[column]) + self._timedelta
return X
[docs]class ConditionedDropper(BaseEstimator, TransformerMixin):
"""Module to drop rows in `column` that contain numeric values and are
above `threshold`. If `inverted` is true, values below `threshold` are
dropped.
Example:
>>> data = pd.DataFrame({'a': [0.0, 1.2, 0.5]})
>>> ConditionedDropper(column='a', threshold=0.5).transform(data)
pd.DataFrame({'a': [0.0, 0.5]})
"""
def __init__(
self,
*,
column: str,
threshold: float,
invert: bool = False
):
"""Initializes `ConditionedDropper`.
Args:
column (str): Column to match condition in.
threshold (float): Threshold.
inverted (bool, optional): If false, all values below `threshold`
are dropped, otherwise all values above are dropped.
"""
super().__init__()
self.column = column
self.threshold = threshold
self.inverted = invert
def fit(self, X, y=None):
return self
[docs] def transform(self, X):
"""Drops rows if below or above a threshold.
Args:
X (pd.DataFrame): Dataframe.
Returns:
pd.DataFrame: Returns the dataframe.
"""
X = X.copy()
if not self.inverted:
X = X.drop(X[X[self.column] > self.threshold].index)
else:
X = X.drop(X[X[self.column] < self.threshold].index)
X.index = pd.RangeIndex(X.shape[0])
return X
[docs]class ZeroVarianceDropper(BaseEstimator, TransformerMixin):
"""Removes all columns that are numeric and have zero variance.
Needs to be fitted first. Gives a warning if a column that was
registered as zero variance deviates.
Example:
>>> data = pd.DataFrame({'a': [0.0, 0.0], 'b': [1.0, 0.0]})
>>> ZeroVarianceDropper().fit_transform(data)
pd.DataFrame({'b': [1.0, 0.0]})
"""
def __init__(self, verbose: bool = False):
"""Initialize `ZeroVarianceDropper`.
Args:
verbose (bool, optional): Whether to print status messages.
"""
super().__init__()
self._verbose = verbose
def _get_zero_variance_columns(self, X: pd.DataFrame) -> T.List[str]:
"""Finds all columns with zero variance.
Args:
X (pd.DataFrame): Dataframe.
Returns:
T.List[str]: Returns a list of column names.
"""
var = X.var()
# get columns with zero variance
return [k for k, v in var.items() if v == .0]
[docs] def fit(self, X, y=None):
"""Finds all columns with zero variance.
Args:
X (pd.DataFrame): Dataframe.
y (array-like, optional): Labels. Defaults to None.
Returns:
ZeroVarianceDropper: Returns self.
"""
self.columns_ = self._get_zero_variance_columns(X)
if self._verbose:
print(
f'Found {len(self.columns_)} columns with 0 variance '
f'({self.columns_}).')
return self
[docs] def transform(self, X):
"""Drops all columns found by fit with zero variance.
Args:
X (pd.DataFrame): Dataframe.
Returns:
pd.DataFrame: Returns the new dataframe.
"""
check_is_fitted(self, 'columns_')
X = X.copy()
# check if columns match
columns = self._get_zero_variance_columns(X)
disj = {*columns} ^ {*self.columns_}
if len(disj) > 0:
warnings.warn(f'Found column with higher variance: {disj}.')
before = X.shape[-1]
X = X.drop(self.columns_, axis=1)
if self._verbose:
after = X.shape[-1]
print(f'Dropped {before - after} columns.')
return X
[docs]class SignalSorter(BaseEstimator, TransformerMixin):
"""Sorts the signals into continuous and binary signals. First the
continuous, then the binary signals.
Example:
>>> data = pd.DataFrame({'a': [0.0, 1.0], 'b': [0.0, 0.2]})
>>> SignalSorter().fit_transform(data)
pd.DataFrame({'b': [1.0, 0.0], 'a': [0.0, 1.0]})
"""
def __init__(self, verbose: bool = False):
"""Initialize `SignalSorter`.
Args:
False: [binary, continuous]
verbose (bool, optional): Whether to print status.
"""
super().__init__()
self.verbose = verbose
def fit(self, X, y=None):
# find signals that are binary
uniques = {col: self._is_binary(X[col]) for col in X.columns}
self.order_ = sorted(uniques.items(), key=lambda v: v[1])
if self.verbose:
print(f'Binary: {self.order_}')
return self
def _is_binary(self, X: pd.Series) -> bool:
"""
Args:
X (pd.Series): Column of a data frame.
Returns:
bool: Whether `x` is a binary series.
"""
unique = X.unique()
if len(unique) > 2:
return False
if len(unique) == 1:
return True
try:
if set(unique.astype('float')) != {1., 0.}:
return False
return True
except Exception:
return False
[docs] def transform(self, X):
"""Sorts `x` into to a block of continuous and binary signals.
Args:
X (pd.DataFrame): Dataframe.
Returns:
pd.DataFrame: Returns the sorted dataframe.
"""
check_is_fitted(self, [])
X = X.copy()
return X[[c[0] for c in self.order_]]
[docs]class ColumnSorter(BaseEstimator, TransformerMixin):
"""Sorts the dataframe in the same order as the fitted dataframe.
Example:
>>> data = pd.DataFrame({'a': [0.0, 1.0], 'b': [0.0, 0.2]})
>>> (sorter := ColumnSorter()).fit(data)
>>> sorter.transform(pd.DataFrame({'b': [0.2, 1.0], 'a': [0.0, 0.1]}))
pd.DataFrame({'a': [0.0, 0.1], 'b': [0.2, 1.0]})
"""
def __init__(self, *, raise_on_error: bool = True, verbose: bool = False):
"""Initialize `ColumnSorter`.
Attributes:
raise_on_error (bool): Whether to raise an exception if additional
columns that were not fitted are found.
verbose (bool): Whether to print the status.
"""
super().__init__()
self.raise_on_error = raise_on_error
self.verbose = verbose
def fit(self, X, y=None):
self.columns_ = X.columns.to_numpy()
if self.verbose:
print(f'Sorting in order {self.columns_}.')
return self
[docs] def transform(self, X):
"""Sorts `X` by `columns`.
Args:
X (pd.DataFrame): Dataframe.
Returns:
pd.DataFrame: Returns the sorted Dataframe.
"""
check_is_fitted(self)
if len((diff := list(set(self.columns_) - set(X.columns)))):
raise ValueError(f'Columns missing: {diff}.')
if len((diff := list(set(X.columns) - set(self.columns_)))):
if self.raise_on_error:
raise ValueError(f'Found additional columns: {diff}.')
else:
warnings.warn(f'Found additional columns: {diff}.')
return X.loc[:, self.columns_]
[docs]class DifferentialCreator(BaseEstimator, TransformerMixin):
"""Calculates signal differences between subsequent time points.
Concatenates the new information with the dataframe.
Example:
>>> data = pd.DataFrame({'a': [1.0, 2.0, 1.0]})
>>> dcreator = DifferentialCreator(columns=['a'])
>>> dcreator.transform(pd.DataFrame(data)
pd.DataFrame({'a': [1.0, 2.0, 1.0], 'a_dif': [1.0, -1.0, 0.0]})
"""
def __init__(self, *, columns: T.List[str]):
"""Initialize `DifferentialCreator`.
Attributes:
keys: T.List[str]: Columns to create derivatives
"""
super().__init__()
self._columns = columns
def fit(self, X, y=None):
return self
[docs] def transform(self, X):
"""Calculate differences between subsequent points. Fill NaN with zero.
Args:
X (pd.DataFrame): Dataframe.
Returns:
pd.DataFrame: Returns the concatenated DataFrame.
"""
X_dif = (X[self._columns]
.diff(axis=0)
.fillna(0)
.add_suffix('_dif'))
return pd.concat([X, X_dif], axis=1)
[docs]class ClippingMinMaxScaler(
OneToOneFeatureMixin, BaseEstimator, TransformerMixin):
"""Normalizes the fitted data to the interval ``feature_range``. The
parameter ``p`` can be used to calculate the ``max`` value as the ``p``-th
percentile of the fitted data, i.e., ``p``% of the data is below.
Data which exceeds the limits of ``feature_range`` after the scaling can be
clipped to specific values via a ``clip`` range.
Example:
>>> data = pd.DataFrame({'a': [1, 2, 3, 4]})
>>> scaler = mlnext.ClippingMinMaxScaler(
... feature_range=(0, 0.5),
... clip=(0, 1))
>>> scaler.fit_transform(df)
a
0 0.000000
1 0.166667
2 0.333333
3 0.500000
>>> df2 = pd.DataFrame({'a': [1, 4, 6, 8, 10]})
a
0 0.000000
1 0.500000
2 0.833333
3 1.000000
4 1.000000
"""
_parameter_constraints: T.Dict[str, list] = {
'feature_range': [tuple, list],
'copy': ['boolean'],
'clip': [None, tuple, list],
'p': [int, float]
}
def __init__(
self,
feature_range: T.Tuple[float, float] = (0, 1),
*,
clip: T.Optional[T.Tuple[float, float]] = None,
p: float = 100.,
copy: bool = True
):
"""Initializes `ClippingMinMaxScaler`.
Args:
feature_range (T.Tuple[float, float]): New feature min and max.
Defaults to (0., 1.).
clip (T.Tuple[float, float]): Range to clip values. Defaults to
None.
p (float): Percentile of data that is used as data maximum.
Defaults to 100.
copy (bool, optional): Whether to create a copy. Defaults to True.
"""
self.feature_range = feature_range
self.clip = clip
self.p = p
self.copy = copy
[docs] def fit(self, X, y=None):
"""Fits the scaler to the data.
Args:
X (np.array): Data.
y ([type], optional): Unused.
Returns:
MinMaxScaler: Returns self.
"""
# FIXME: hack to preserve output format
# update if output_format can be preserved through sklearn
if isinstance(X, pd.DataFrame):
self.set_output(transform='pandas')
self._validate_params()
X = self._validate_data(
X,
dtype=FLOAT_DTYPES,
reset=True
)
self.data_min_ = np.min(X, axis=0)
self.data_max_ = np.percentile(X, self.p, axis=0)
self.data_range_ = self.data_max_ - self.data_min_
f_range = self.feature_range
self.scale_ = (f_range[1] - f_range[0]) / self.data_range_
self.min_ = f_range[0] - self.data_min_ * self.scale_
return self
[docs] def transform(self, X) -> np.ndarray:
"""Transforms ``X`` to the new feature range.
Args:
X (np.array): Data.
Returns:
np.array: Returns the scaled ``X``.
"""
check_is_fitted(self)
X = self._validate_data(
X,
copy=self.copy,
dtype=FLOAT_DTYPES,
reset=False
)
X *= self.scale_
X += self.min_
if self.clip is not None:
X = np.clip(X, self.clip[0], self.clip[1])
return X