import pandas
import numpy
import matplotlib.pyplot
from .Util import SavePeriod,IntegrateHour
[docs]
def RemoveOutliersMMADMM(x_in: pandas.core.frame.DataFrame,
df_avoid_periods: pandas.core.frame.DataFrame = pandas.DataFrame([]),
len_mov_avg: int = 4 * 12,
std_def: float = 2,
min_var_def: float = 0.5,
allow_negatives: bool = False,
plot: bool = False,
remove_from_process: list = [],
) -> pandas.core.frame.DataFrame:
"""
Removes outliers from the timeseries on each column using the (M)oving (M)edian (A)bslute
(D)eviation around the (M)oving (M)edian.
A statistical method is used for removing the remaining outliers. In LEYS et al. (2019), the authors state that it
is common practice the use of plus and minus the standard deviation (±σ) around the mean (µ), however,
this measurement is particularly sensitive to outliers. Furthermore, the authors propose the use of the
absolute deviation around the median.Therefore, in this work the limit was set by the median absolute
deviation (MADj) around the moving median (Mj) where j denotes the number of samples of the moving window.
Typically, an MV feeder has a seasonality where in the summer load is higher than in the winter or vice-versa.
Hence, it is vital to use the moving median instead of the median of all the time series.
:param x_in: A pandas.core.frame.DataFrame where the index is of type "pandas.core.indexes.datetime.DatetimeIndex"
and each column contain an electrical quantity time series.
:type x_in: pandas.core.frame.DataFrame
:param df_avoid_periods: The first column with the start and the second column with the end date.
:type df_avoid_periods: pandas.core.frame.DataFrame
:param len_mov_avg: Size of the windows of the moving average.
:type len_mov_avg: int,optional
:param std_def: Absolute standard deviation to be computed around the moving average.
:type std_def: float,optional
:param min_var_def: For low variance data this parameter will set a minimum distance from the upper and lower
boundaries.
:type min_var_def: float,optional
:param allow_negatives: Allow for the lower level to be below zero.
:type allow_negatives: bool,optional
:param plot: A plot of the boundaries and result to debug parameters.
:type plot: bool,optional
:param remove_from_process: Columns to be kept off the process.
:type remove_from_process: list,optional
:raises Exception: if x_in has no DatetimeIndex.
:return: Y: A pandas.core.frame.DataFrame without the outliers
:rtype: Y: pandas.core.frame.DataFrame
"""
# -------------------#
# BASIC INPUT CHECK #
# -------------------#
if not (isinstance(x_in.index, pandas.DatetimeIndex)):
raise Exception("x_in DataFrame has no DatetimeIndex.")
X = x_in.copy(deep=True)
if len(remove_from_process) > 0:
X = X.drop(remove_from_process, axis=1)
Y = X.copy(deep=True)
# ------------------------ OUTLIERS ------------------------
X_mark_outlier = X.copy(deep=True).astype(bool)
X_mark_outlier.loc[:, :] = False
# ---------PROCESSAMENTO OUTLIERS POR MÉDIA MÓVEL
X_moving_median = X.copy(deep=True)
# DESVIO PADRÂO ABSOLUTO ENTORNO DA MEDIANA MOVEL
# ------------ Computa Mediana Móvel ------------#
X_moving_median = X_moving_median.rolling(len_mov_avg).median().shift(-int(len_mov_avg / 2))
#X_moving_median.iloc[-2 * len_mov_avg:, :] = X_moving_median.iloc[-2 * len_mov_avg:, :].fillna(method='ffill')
#X_moving_median.iloc[:2 * len_mov_avg, :] = X_moving_median.iloc[:2 * len_mov_avg, :].fillna(method='bfill')
X_moving_median.iloc[-2 * len_mov_avg:, :] = X_moving_median.iloc[-2 * len_mov_avg:, :].ffill()
X_moving_median.iloc[:2 * len_mov_avg, :] = X_moving_median.iloc[:2 * len_mov_avg, :].bfill()
# ------------ Computa MAD Móvel ------------#
X_mad = X - X_moving_median
X_mad = X_mad.rolling(len_mov_avg).median().shift(-int(len_mov_avg / 2))
#X_mad.iloc[-2 * len_mov_avg:, :] = X_mad.iloc[-2 * len_mov_avg:, :].fillna(method='ffill')
#X_mad.iloc[:2 * len_mov_avg, :] = X_mad.iloc[:2 * len_mov_avg, :].fillna(method='bfill')
X_mad.iloc[-2 * len_mov_avg:, :] = X_mad.iloc[-2 * len_mov_avg:, :].ffill()
X_mad.iloc[:2 * len_mov_avg, :] = X_mad.iloc[:2 * len_mov_avg, :].bfill()
# ------------ Coloca no mínimo de faixa de segurança para dados com baixa variância ------------#
X_mad[X_mad <= min_var_def] = min_var_def
# ------------ MAD Móvel Limites ------------#
X_moving_up = X_moving_median + std_def * X_mad
X_moving_down = X_moving_median - std_def * X_mad
# Allow the lower limit to go negative. Only valid for kVar or bidirectional current/Power.
if not allow_negatives:
X_moving_down[X_moving_down <= 0] = 0
# ------------ Marcando outliers ------------#
X_mark = (X >= X_moving_up) | (X <= X_moving_down)
# ------------ Não marca os intervalos onde não foi possível determinar ------------#
X_mark[X_moving_up.isnull() | X_moving_down.isnull()] = False
X_mark.iloc[:int(len_mov_avg / 2), :] = False
X_mark.iloc[-int(len_mov_avg / 2), :] = False
Y[X_mark] = numpy.nan
# ------------ Não marca os intervalos selecionados ------------#
if df_avoid_periods.shape[0] != 0:
df_values, index_return = SavePeriod(X, df_avoid_periods)
Y.loc[index_return, :] = df_values
# return the keep out columns
if len(remove_from_process) > 0:
Y = pandas.concat([Y, x_in.loc[:, remove_from_process]], axis=1)
# For debug
if plot:
#ax = X_moving_median.plot()
ax = x_in.plot(title = 'RemoveOutliersMMADMM')
#X_mad.plot(ax=ax)
X_moving_down.plot.line(ax=ax,color='black',style='-')
X_moving_up.plot(ax=ax,color='black',style='-')
X[X_mark].plot(ax=ax,color='red',style='.')
#Y.plot()
matplotlib.pyplot.show()
return Y
[docs]
def RemoveOutliersHardThreshold(x_in: pandas.core.frame.DataFrame,
hard_max: float,
hard_min: float,
remove_from_process: list = [],
df_avoid_periods=pandas.DataFrame([])) -> pandas.core.frame.DataFrame:
"""
Removes outliers from the timeseries on each column using threshold.
:param x_in: A pandas.core.frame.DataFrame where the index is of type "pandas.core.indexes.datetime.DatetimeIndex"
and each column contain an electrical quantity time series.
:type x_in: pandas.core.frame.DataFrame
:param hard_max: Max value for the threshold limit
:type hard_max: float
:param hard_min: Min value for the threshold limit
:type hard_min: float
:param remove_from_process: Columns to be kept off the process;
:type remove_from_process: list,optional
:param df_avoid_periods: The first column with the start and the second column with the end date.
:type df_avoid_periods: pandas.core.frame.DataFrame
:return: Y: A pandas.core.frame.DataFrame without the outliers
:rtype: Y: pandas.core.frame.DataFrame
"""
X = x_in.copy(deep=True)
# Remove keep out columns
if len(remove_from_process) > 0:
X = X.drop(remove_from_process, axis=1)
Y = X.copy(deep=True)
Y[Y >= hard_max] = numpy.nan
Y[Y <= hard_min] = numpy.nan
if df_avoid_periods.shape[0] != 0:
df_values, index_return = SavePeriod(X, df_avoid_periods)
Y.loc[index_return, :] = df_values
# return the keep out columns
if len(remove_from_process) > 0:
Y = pandas.concat([Y, x_in.loc[:, remove_from_process]], axis=1)
return Y
[docs]
def RemoveOutliersQuantile(x_in: pandas.core.frame.DataFrame,
remove_from_process: list = [],
df_avoid_periods=pandas.DataFrame([])) -> pandas.core.frame.DataFrame:
"""
Removes outliers from the timeseries on each column using the top and bottom
quantile metric as an outlier marker.
:param x_in: A pandas.core.frame.DataFrame where the index is of type "pandas.core.indexes.datetime.DatetimeIndex"
and each column contain an electrical quantity time series.
:type x_in: pandas.core.frame.DataFrame
:param remove_from_process: Columns to be kept off the process;
:type remove_from_process: list,optional
:param df_avoid_periods: The first column with the start and the second column with the end date.
:type df_avoid_periods: pandas.core.frame.DataFrame
:return: Y: A pandas.core.frame.DataFrame without the outliers
:rtype: Y: pandas.core.frame.DataFrame
"""
X = x_in.copy(deep=True)
# Remove the keep out columns
if len(remove_from_process) > 0:
X = X.drop(remove_from_process, axis=1)
Y = X.copy(deep=True)
for col_name in Y.columns:
q1 = X[col_name].quantile(0.25)
q3 = X[col_name].quantile(0.75)
iqr = q3 - q1 # Inter quartile range
fence_low = q1 - 1.5 * iqr
fence_high = q3 + 1.5 * iqr
Y.loc[(Y[col_name] < fence_low) | (Y[col_name] > fence_high), col_name] = numpy.nan
if df_avoid_periods.shape[0] != 0:
df_values, index_return = SavePeriod(X, df_avoid_periods)
Y.loc[index_return, :] = df_values
# return the keep out columns
if len(remove_from_process) > 0:
Y = pandas.concat([Y, x_in.loc[:, remove_from_process]], axis=1)
return Y
[docs]
def RemoveOutliersHistogram(x_in: pandas.core.frame.DataFrame,
df_avoid_periods: pandas.DataFrame = pandas.DataFrame([]),
remove_from_process: list = [],
sample_freq: int = 5,
min_number_of_samples_limit: int = 12) -> pandas.core.frame.DataFrame:
"""
Removes outliers from the timeseries on each column using the histogram.
The parameter 'min_number_of_samples_limit' specify the minimum amount of hours, if integrate flag is True, or
samples that a value must have to be considered not an outlier.
:param x_in: A pandas.core.frame.DataFrame where the index is of type "pandas.core.indexes.datetime.DatetimeIndex"
and each column contain an electrical quantity time series.
:type x_in: pandas.core.frame.DataFrame
:param remove_from_process: Columns to be kept off the process;
:type remove_from_process: list,optional
:param df_avoid_periods: The first column with the start and the second column with the end date.
:type df_avoid_periods: pandas.core.frame.DataFrame
:param integrate_hour: Makes the analysis on the data integrated to an hour
:type integrate_hour: bool,optional
:param sample_freq: The sample frequency of the time series. Defaults to 5.
:type sample_freq: int,optional
:param min_number_of_samples_limit: The number of samples to be considered valid
:type min_number_of_samples_limit: int,optional
:return: Y: A pandas.core.frame.DataFrame without the outliers
:rtype: Y: pandas.core.frame.DataFrame
"""
X = x_in.copy(deep=True)
# Remove the keep out columns
if len(remove_from_process) > 0:
X = X.drop(remove_from_process, axis=1)
Y = X.copy(deep=True)
for col in X.columns:
X.loc[:,col] = X.loc[:,col].sort_values(ascending=False, ignore_index=True).values
#print(X.copy(deep=True))
if X.shape[0] < min_number_of_samples_limit:
min_number_of_samples_limit = X.shape[0]
threshold_max = X.iloc[min_number_of_samples_limit, :]
threshold_min = X.iloc[-min_number_of_samples_limit - 1, :]
threshold_max = threshold_max.fillna(9999999999)
threshold_min = threshold_min.fillna(0)
#print(f"threshold_max: {threshold_max}")
#print(f"threshold_min: {threshold_min}")
for col in Y.columns:
Y.loc[numpy.logical_or(Y[col] > threshold_max[col], Y[col] < threshold_min[col]), col] = numpy.nan
if df_avoid_periods.shape[0] != 0:
df_values, index_return = SavePeriod(X, df_avoid_periods)
Y.loc[index_return, :] = df_values
# return the keep out columns
if len(remove_from_process) > 0:
Y = pandas.concat([Y, x_in.loc[:, remove_from_process]], axis=1)
return Y