""" Module for data loading and manipulation.
"""
import os
import typing as T
import warnings
import numpy as np
import pandas as pd
from numpy.lib.stride_tricks import sliding_window_view
from .utils import check_ndim
__all__ = [
'load_data_3d',
'load_data',
'temporalize',
'detemporalize',
'sample_normal',
'sample_bernoulli'
]
[docs]def load_data_3d(
path: str,
*,
timesteps: int,
format: T.Dict[str, T.Any] = {},
verbose: bool = True
) -> np.ndarray:
"""Loads data from `path` and temporalizes it with `timesteps`.
Args:
path (str): Path to file.
timesteps (int): Widow size.
format (T.Dict[str, T.Any]): Format args for pd.read_csv.
verbose (bool): Whether to print status information.
Returns:
np.ndarray: Returns the data.
Example:
>>> # Loading 2d data and reshaping it to 3d
>>> X_train = load_data_3d(path='./data/train.csv', timesteps=10)
>>> X_train.shape
(100, 10, 18)
"""
df = load_data(path=path, verbose=verbose, **format)
return temporalize(data=df, timesteps=timesteps, verbose=verbose)
[docs]def load_data(path: str, *, verbose: bool = True, **kwargs) -> pd.DataFrame:
"""Loads data from `path`.
Args:
path (str): Path to csv.
format (T.Dict[str, T.Any]): Keywords for pd.read_csv.
Returns:
pd.DataFrame: Returns the loaded data.
Example:
>>> # Loading data from a csv file with custom seperator
>>> data = load_data('./data/train.csv', sep=',')
Loaded train.csv with 1000 rows and 18 columns.
"""
df = pd.read_csv(path, **kwargs)
if verbose:
_, name = os.path.split(path)
rows, cols = df.shape
print(f'Loaded {name} with {rows} rows and {cols} columns.')
return df
[docs]def temporalize(
data: T.Union[pd.DataFrame, np.ndarray],
*,
timesteps: int,
stride: int = 0,
verbose: bool = False
) -> np.ndarray:
"""Transforms a 2 dimensional array (rows, features) into a 3 dimensional
array of shape (new_rows, timesteps, features). The step size along axis 0
can be set with ``stride``. If ``stride=0`` or ``stride=timesteps``, the
operation is equivalent to ``data.reshape(-1, timesteps, features)``.
Note: if rows % timesteps != 0 some rows might be discarded.
Arguments:
data (pd.DataFrame, np.ndarray): Data to transform.
timesteps (int): Number of timesteps.
stride (int): Step size along the first axis (Default: 0).
verbose (bool): Whether to print status information.
Returns:
np.ndarray: Returns an array of shape rows x timesteps x features.
Example:
>>> import numpy as np
>>> import mlnext
>>> # setup data
>>> i, j = np.ogrid[:6, :3]
>>> data = 10 * i + j
>>> print(data)
[[ 0 1 2]
[10 11 12]
[20 21 22]
[30 31 32]
[40 41 42]
[50 51 52]]
>>> # Transform 2d data into 3d
>>> mlnext.temporalize(data=data, timesteps=2, verbose=True)
Old shape: (6, 2). New shape: (3, 2, 3).
[[[ 0 1 2]
[10 11 12]]
[[20 21 22]
[30 31 32]]
[[40 41 42]
[50 51 52]]]
>>> # Transform 2d into 3d with stride=1
>>> mlnext.temporalize(data, timesteps=3, stride=1, verbose=True)
Old shape: (6, 3). New shape: (4, 3, 3).
[[[ 0 1 2]
[10 11 12]
[20 21 22]]
[[10 11 12]
[20 21 22]
[30 31 32]]
[[20 21 22]
[30 31 32]
[40 41 42]]
[[30 31 32]
[40 41 42]
[50 51 52]]]
"""
data = np.array(data)
old_shape = data.shape
check_ndim(data, ndim=2)
if timesteps < 1:
raise ValueError('Timesteps must be greater than 1.')
if stride < 0:
raise ValueError('Stride must be greater than 0.')
if stride > timesteps:
warnings.warn(
f'Reversion with mlnext.detemporalize will result in a loss of '
f'rows (stride: {stride} larger than timesteps: {timesteps}).')
# stride = 0 and stride=timesteps is the same as a simple reshape
# to (rows, timesteps, features) (slice=0 is replaced by timesteps)
stride = stride or timesteps
# sliding view with stride
data = sliding_window_view(
data,
window_shape=(timesteps, data.shape[-1]),
).squeeze(axis=1)[::stride]
if verbose:
print(f'Old shape: {old_shape}. New shape: {data.shape}.')
return data
[docs]def detemporalize(
data: np.ndarray,
*,
stride: int = 0,
last_point_only: bool = False,
verbose: bool = False
) -> np.ndarray:
"""
Transforms a 3 dimensional array (rows, timesteps, features) into a 2
dimensional array (new_rows, features). If ``stride`` >= timesteps
or 0, then the operation is equivalent to ``data.reshape(-1, features)``
and new_rows equals rows * timesteps. If 0 < ``stride`` < timesteps, the
stride induced elements will be removed and new_rows equals (rows -
timesteps) * timesteps. If ``last_point_only=True`` then only the last
point in each window is kept and new_rows equals (rows, features).
Arguments:
data (np.ndarray): Array to transform.
stride (np.ndarray): Stride that was used to transform the array from
2d into 3d.
last_point_only (np.ndarray): Whether to only take the last point of
each window.
verbose (bool): Whether to print old and new shape.
Returns:
np.ndarray: Returns an array of shape (rows * timesteps) x features.
Example:
>>> import numpy as np
>>> import mlnext
>>> # setup data
>>> i, j = np.ogrid[:6, :3]
>>> data = 10 * i + j
>>> print(data)
[[ 0 1 2]
[10 11 12]
[20 21 22]
[30 31 32]
[40 41 42]
[50 51 52]]
>>> # Transform 3d data into 2d
>>> data_3d = mlnext.temporalize(data, timesteps=2)
>>> print(data_3d)
[[[ 0 1 2]
[10 11 12]]
[[20 21 22]
[30 31 32]]
[[40 41 42]
[50 51 52]]]
>>> mlnext.detemporalize(data_3d, verbose=True)
Old shape: (3, 2, 3). New shape: (6, 3).
[[ 0 1 2]
[10 11 12]
[20 21 22]
[30 31 32]
[40 41 42]
[50 51 52]]
>>> # Transform 3d data into 2d with stride=1
>>> data_3d = mlnext.temporalize(data,
... timesteps=3, stride=1, verbose=True)
Old shape: (6, 3). New shape: (4, 3, 3).
>>> print(data_3d)
[[[ 0 1 2]
[10 11 12]
[20 21 22]]
[[10 11 12]
[20 21 22]
[30 31 32]]
[[20 21 22]
[30 31 32]
[40 41 42]]
[[30 31 32]
[40 41 42]
[50 51 52]]]
>>> mlnext.detemporalize(data_3d, stride=1, verbose=True)
Old shape: (4, 3, 3). New shape: (6, 3).
[[ 0 1 2]
[10 11 12]
[20 21 22]
[30 31 32]
[40 41 42]
[50 51 52]]
>>> # Take only the last point from each window
>>> mlnext.detemporalize(data_3d, last_point_only=True, verbose=True)
Old shape: (4, 3, 3). New shape: (4, 3).
[[20 21 22]
[30 31 32]
[40 41 42]
[50 51 52]]
"""
data = np.array(data)
if data.ndim < 3:
# nothing to do
return data
check_ndim(data, ndim=3)
rows, timesteps, features = data.shape # (rows, timesteps, features)
if stride < 0:
raise ValueError('Stride must be greater than 0.')
if last_point_only:
# take only the last point in each window
s = slice(timesteps - 1, None, timesteps)
data = data.reshape(-1, features)[s]
else:
# remove stride
step = stride if stride > 0 and stride < timesteps else timesteps
# extract the last window, we need all of it
lw = data[-1]
# take the first `step`-values of each window
data = data[:-1, :step, :].reshape(-1, features)
# concat along axis 0
data = np.r_[data, lw]
if verbose:
print(f'Old shape: {(rows, timesteps, features)}. '
f'New shape: {data.shape}.')
return data
[docs]def sample_normal(*, mean: np.ndarray, std: np.ndarray) -> np.ndarray:
"""Samples from a normal gaussian with mu=`mean` and sigma=`std`.
Args:
mean (np.ndarray): Mean of the normal distribution.
std (np.ndarray): Standard deviation of the normal distribution.
Returns:
np.ndarray: Returns the drawn samples.
Example:
>>> # Sample from a normal distribution with mean and standard dev.
>>> sample_normal(mean=[0.1], std=[1])
array([-0.77506174])
"""
return np.random.normal(loc=mean, scale=std)
[docs]def sample_bernoulli(mean: np.ndarray) -> np.ndarray:
"""Samples from a bernoulli distribution with `mean`.
Args:
mean (np.ndarray): Mean of the bernoulli distribution.
Returns:
np.ndarray: Returns the drawn samples.
Example:
>>> # Sample from a bernoulli distribution with mean
>>> sample_bernoulli(mean=0.2)
0
"""
return np.random.binomial(n=1, p=mean)