diff --git a/coreforecast/grouped_array.py b/coreforecast/grouped_array.py index a1d2016..f6a1f95 100644 --- a/coreforecast/grouped_array.py +++ b/coreforecast/grouped_array.py @@ -1,6 +1,7 @@ import ctypes import platform import sys +from typing import Union import numpy as np @@ -59,6 +60,13 @@ def __len__(self): def __getitem__(self, i): return self.data[self.indptr[i] : self.indptr[i + 1]] + def _pyfloat_to_c(self, x: float) -> Union[ctypes.c_float, ctypes.c_double]: + if self.prefix == "GroupedArrayFloat32": + out = ctypes.c_float(x) + else: + out = ctypes.c_double(x) + return out + def scaler_fit(self, scaler_type: str) -> np.ndarray: stats = np.full_like(self.data, np.nan, shape=(len(self), 2)) _LIB[f"{self.prefix}_{scaler_type}ScalerStats"]( @@ -116,6 +124,20 @@ def rolling_transform( ) return out + def rolling_quantile_transform( + self, lag: int, p: float, window_size: int, min_samples: int + ) -> np.ndarray: + out = np.full_like(self.data, np.nan) + _LIB[f"{self.prefix}_RollingQuantileTransform"]( + self._handle, + ctypes.c_int(lag), + self._pyfloat_to_c(p), + ctypes.c_int(window_size), + ctypes.c_int(min_samples), + _data_as_void_ptr(out), + ) + return out + def rolling_update( self, stat_name: str, lag: int, window_size: int, min_samples: int ) -> np.ndarray: @@ -129,6 +151,20 @@ def rolling_update( ) return out + def rolling_quantile_update( + self, lag: int, p: float, window_size: int, min_samples: int + ) -> np.ndarray: + out = np.empty_like(self.data, shape=len(self)) + _LIB[f"{self.prefix}_RollingQuantileUpdate"]( + self._handle, + ctypes.c_int(lag), + self._pyfloat_to_c(p), + ctypes.c_int(window_size), + ctypes.c_int(min_samples), + _data_as_void_ptr(out), + ) + return out + def seasonal_rolling_transform( self, stat_name: str, @@ -167,6 +203,46 @@ def seasonal_rolling_update( ) return out + def seasonal_rolling_quantile_transform( + self, + lag: int, + p: float, + season_length: int, + window_size: int, + min_samples: int, + ) -> np.ndarray: + out = np.full_like(self.data, np.nan) + _LIB[f"{self.prefix}_SeasonalRollingQuantileTransform"]( + self._handle, + ctypes.c_int(lag), + ctypes.c_int(season_length), + self._pyfloat_to_c(p), + ctypes.c_int(window_size), + ctypes.c_int(min_samples), + _data_as_void_ptr(out), + ) + return out + + def seasonal_rolling_quantile_update( + self, + lag: int, + p: float, + season_length: int, + window_size: int, + min_samples: int, + ) -> np.ndarray: + out = np.empty_like(self.data, shape=len(self)) + _LIB[f"{self.prefix}_SeasonalRollingQuantileUpdate"]( + self._handle, + ctypes.c_int(lag), + ctypes.c_int(season_length), + self._pyfloat_to_c(p), + ctypes.c_int(window_size), + ctypes.c_int(min_samples), + _data_as_void_ptr(out), + ) + return out + def expanding_transform_with_aggs( self, stat_name: str, @@ -195,6 +271,26 @@ def expanding_transform( ) return out + def expanding_quantile_transform(self, lag: int, p: float) -> np.ndarray: + out = np.full_like(self.data, np.nan) + _LIB[f"{self.prefix}_ExpandingQuantileTransform"]( + self._handle, + ctypes.c_int(lag), + self._pyfloat_to_c(p), + _data_as_void_ptr(out), + ) + return out + + def expanding_quantile_update(self, lag: int, p: float) -> np.ndarray: + out = np.empty_like(self.data, shape=len(self)) + _LIB[f"{self.prefix}_ExpandingQuantileUpdate"]( + self._handle, + ctypes.c_int(lag), + self._pyfloat_to_c(p), + _data_as_void_ptr(out), + ) + return out + def exponentially_weighted_transform( self, stat_name: str, @@ -202,14 +298,10 @@ def exponentially_weighted_transform( alpha: float, ) -> np.ndarray: out = np.full_like(self.data, np.nan) - if self.prefix == "GroupedArrayFloat32": - alpha = ctypes.c_float(alpha) - else: - alpha = ctypes.c_double(alpha) _LIB[f"{self.prefix}_ExponentiallyWeighted{stat_name}Transform"]( self._handle, ctypes.c_int(lag), - alpha, + self._pyfloat_to_c(alpha), _data_as_void_ptr(out), ) return out diff --git a/coreforecast/lag_transforms.py b/coreforecast/lag_transforms.py index 33c5ebf..fe131e5 100644 --- a/coreforecast/lag_transforms.py +++ b/coreforecast/lag_transforms.py @@ -4,14 +4,17 @@ "RollingStd", "RollingMin", "RollingMax", + "RollingQuantile", "SeasonalRollingMean", "SeasonalRollingStd", "SeasonalRollingMin", "SeasonalRollingMax", + "SeasonalRollingQuantile", "ExpandingMean", "ExpandingStd", "ExpandingMin", "ExpandingMax", + "ExpandingQuantile", "ExponentiallyWeightedMean", ] @@ -86,6 +89,24 @@ class RollingMax(RollingBase): stat_name = "Max" +class RollingQuantile(RollingBase): + def __init__( + self, lag: int, p: float, window_size: int, min_samples: Optional[int] = None + ): + super().__init__(lag=lag, window_size=window_size, min_samples=min_samples) + self.p = p + + def transform(self, ga: GroupedArray) -> np.ndarray: + return ga.rolling_quantile_transform( + self.lag, self.p, self.window_size, self.min_samples + ) + + def update(self, ga: GroupedArray) -> np.ndarray: + return ga.rolling_quantile_update( + self.lag - 1, self.p, self.window_size, self.min_samples + ) + + class SeasonalRollingBase(RollingBase): season_length: int @@ -134,6 +155,42 @@ class SeasonalRollingMax(SeasonalRollingBase): stat_name = "Max" +class SeasonalRollingQuantile(SeasonalRollingBase): + def __init__( + self, + lag: int, + p: float, + season_length: int, + window_size: int, + min_samples: Optional[int] = None, + ): + super().__init__( + lag=lag, + season_length=season_length, + window_size=window_size, + min_samples=min_samples, + ) + self.p = p + + def transform(self, ga: GroupedArray) -> np.ndarray: + return ga.seasonal_rolling_quantile_transform( + lag=self.lag, + p=self.p, + season_length=self.season_length, + window_size=self.window_size, + min_samples=self.min_samples, + ) + + def update(self, ga: GroupedArray) -> np.ndarray: + return ga.seasonal_rolling_quantile_update( + lag=self.lag - 1, + p=self.p, + season_length=self.season_length, + window_size=self.window_size, + min_samples=self.min_samples, + ) + + class ExpandingBase(BaseLagTransform): def __init__(self, lag: int): self.lag = lag @@ -193,6 +250,18 @@ class ExpandingMax(ExpandingComp): comp_fn = np.maximum +class ExpandingQuantile(BaseLagTransform): + def __init__(self, lag: int, p: float): + self.lag = lag + self.p = p + + def transform(self, ga: GroupedArray) -> np.ndarray: + return ga.expanding_quantile_transform(self.lag, self.p) + + def update(self, ga: GroupedArray) -> np.ndarray: + return ga.expanding_quantile_update(self.lag - 1, self.p) + + class ExponentiallyWeightedMean(BaseLagTransform): def __init__(self, lag: int, alpha: float): self.lag = lag diff --git a/include/coreforecast.h b/include/coreforecast.h index 819513d..d93bbd7 100644 --- a/include/coreforecast.h +++ b/include/coreforecast.h @@ -106,6 +106,15 @@ GroupedArrayFloat64_RollingMaxTransform(GroupedArrayHandle handle, int lag, int window_size, int min_samples, double *out); +DLL_EXPORT int +GroupedArrayFloat32_RollingQuantileTransform(GroupedArrayHandle handle, int lag, + float p, int window_size, + int min_samples, float *out); +DLL_EXPORT int +GroupedArrayFloat64_RollingQuantileTransform(GroupedArrayHandle handle, int lag, + double p, int window_size, + int min_samples, double *out); + DLL_EXPORT int GroupedArrayFloat32_RollingMeanUpdate(GroupedArrayHandle handle, int lag, int window_size, int min_samples, @@ -142,6 +151,15 @@ DLL_EXPORT int GroupedArrayFloat64_RollingMaxUpdate(GroupedArrayHandle handle, int min_samples, double *out); +DLL_EXPORT int +GroupedArrayFloat32_RollingQuantileUpdate(GroupedArrayHandle handle, int lag, + float p, int window_size, + int min_samples, float *out); +DLL_EXPORT int +GroupedArrayFloat64_RollingQuantileUpdate(GroupedArrayHandle handle, int lag, + double p, int window_size, + int min_samples, double *out); + DLL_EXPORT int GroupedArrayFloat32_SeasonalRollingMeanTransform( GroupedArrayHandle handle, int lag, int season_length, int window_size, int min_samples, float *out); @@ -170,6 +188,13 @@ DLL_EXPORT int GroupedArrayFloat64_SeasonalRollingMaxTransform( GroupedArrayHandle handle, int lag, int season_length, int window_size, int min_samples, double *out); +DLL_EXPORT int GroupedArrayFloat32_SeasonalRollingQuantileTransform( + GroupedArrayHandle handle, int lag, int season_length, float p, + int window_size, int min_samples, float *out); +DLL_EXPORT int GroupedArrayFloat64_SeasonalRollingQuantileTransform( + GroupedArrayHandle handle, int lag, int season_length, double p, + int window_size, int min_samples, double *out); + DLL_EXPORT int GroupedArrayFloat32_SeasonalRollingMeanUpdate( GroupedArrayHandle handle, int lag, int season_length, int window_size, int min_samples, float *out); @@ -204,6 +229,13 @@ GroupedArrayFloat64_SeasonalRollingMaxUpdate(GroupedArrayHandle handle, int lag, int season_length, int window_size, int min_samples, double *out); +DLL_EXPORT int GroupedArrayFloat32_SeasonalRollingQuantileUpdate( + GroupedArrayHandle handle, int lag, int season_length, float p, + int window_size, int min_samples, float *out); +DLL_EXPORT int GroupedArrayFloat64_SeasonalRollingQuantileUpdate( + GroupedArrayHandle handle, int lag, int season_length, double p, + int window_size, int min_samples, double *out); + DLL_EXPORT int GroupedArrayFloat32_ExpandingMeanTransform(GroupedArrayHandle handle, int lag, float *out, float *agg); @@ -232,6 +264,20 @@ DLL_EXPORT int GroupedArrayFloat64_ExpandingMaxTransform(GroupedArrayHandle handle, int lag, double *out); +DLL_EXPORT int +GroupedArrayFloat32_ExpandingQuantileTransform(GroupedArrayHandle handle, + int lag, float p, float *out); +DLL_EXPORT int +GroupedArrayFloat64_ExpandingQuantileTransform(GroupedArrayHandle handle, + int lag, double p, double *out); + +DLL_EXPORT int +GroupedArrayFloat32_ExpandingQuantileUpdate(GroupedArrayHandle handle, int lag, + float p, float *out); +DLL_EXPORT int +GroupedArrayFloat64_ExpandingQuantileUpdate(GroupedArrayHandle handle, int lag, + double p, double *out); + DLL_EXPORT int GroupedArrayFloat32_ExponentiallyWeightedMeanTransform( GroupedArrayHandle handle, int lag, float alpha, float *out); DLL_EXPORT int GroupedArrayFloat64_ExponentiallyWeightedMeanTransform( diff --git a/pyproject.toml b/pyproject.toml index 701272c..3ae3e10 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,5 +43,5 @@ wheel.packages = ["coreforecast"] wheel.py-api = "py3" [tool.cibuildwheel] -test-requires = "pytest window-ops" +test-requires = "pandas pytest window-ops" test-command = "pytest {project}/tests -k correct" diff --git a/requirements-test.txt b/requirements-test.txt index a6a9968..1bc1880 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,6 +1,7 @@ importlib_resources ; python_version < "3.10" numba numpy +pandas pytest pytest-benchmark utilsforecast>=0.0.7 diff --git a/src/coreforecast.cpp b/src/coreforecast.cpp index 9042b0a..8556834 100644 --- a/src/coreforecast.cpp +++ b/src/coreforecast.cpp @@ -70,6 +70,17 @@ template inline T Quantile(T *data, T p, int n) { return out; } +template inline T SortedQuantile(T *data, T p, int n) { + T i_plus_g = p * (n - 1); + int i = static_cast(i_plus_g); + T g = i_plus_g - i; + T out = data[i]; + if (g > 0.0) { + out += g * (data[i + 1] - out); + } + return out; +} + template inline void RobustScalerIqrStats(const T *data, int n, T *stats) { T *buffer = new T[n]; @@ -196,10 +207,66 @@ template struct RollingMaxTransform { } }; -template +template +inline void RollingQuantileTransform(const T *data, int n, T *out, + int window_size, int min_samples, T p) { + int upper_limit = std::min(window_size, n); + T *buffer = new T[upper_limit]; + int *positions = new int[upper_limit]; + min_samples = std::min(min_samples, upper_limit); + for (int i = 0; i < min_samples - 1; ++i) { + buffer[i] = data[i]; + positions[i] = i; + out[i] = std::numeric_limits::quiet_NaN(); + } + if (min_samples > 2) { + std::sort(buffer, buffer + min_samples - 2); + } + for (int i = min_samples - 1; i < upper_limit; ++i) { + int idx = std::lower_bound(buffer, buffer + i, data[i]) - buffer; + for (int j = 0; j < i - idx; ++j) { + buffer[i - j] = buffer[i - j - 1]; + positions[i - j] = positions[i - j - 1]; + } + buffer[idx] = data[i]; + positions[idx] = i; + out[i] = SortedQuantile(buffer, p, i + 1); + } + for (int i = window_size; i < n; ++i) { + int remove_idx = + std::min_element(positions, positions + window_size) - positions; + int idx; + if (data[i] <= buffer[remove_idx]) { + idx = std::lower_bound(buffer, buffer + remove_idx, data[i]) - buffer; + for (int j = 0; j < remove_idx - idx; ++j) { + buffer[remove_idx - j] = buffer[remove_idx - j - 1]; + positions[remove_idx - j] = positions[remove_idx - j - 1]; + } + } else { + idx = (std::lower_bound(buffer + remove_idx - 1, buffer + window_size, + data[i]) - + buffer) - + 1; + if (idx == window_size) { + --idx; + } + for (int j = 0; j < idx - remove_idx; ++j) { + buffer[remove_idx + j] = buffer[remove_idx + j + 1]; + positions[remove_idx + j] = positions[remove_idx + j + 1]; + } + } + buffer[idx] = data[i]; + positions[idx] = i; + out[i] = SortedQuantile(buffer, p, window_size); + } + delete[] buffer; + delete[] positions; +} + +template inline void SeasonalRollingTransform(Func RollingTfm, const T *data, int n, T *out, int season_length, int window_size, - int min_samples) { + int min_samples, Args &&...args) { int buff_size = n / season_length + (n % season_length > 0); T *season_data = new T[buff_size]; T *season_out = new T[buff_size]; @@ -209,7 +276,8 @@ inline void SeasonalRollingTransform(Func RollingTfm, const T *data, int n, for (int j = 0; j < season_n; ++j) { season_data[j] = data[i + j * season_length]; } - RollingTfm(season_data, season_n, season_out, window_size, min_samples); + RollingTfm(season_data, season_n, season_out, window_size, min_samples, + std::forward(args)...); for (int j = 0; j < season_n; ++j) { out[i + j * season_length] = season_out[j]; } @@ -250,16 +318,25 @@ template struct SeasonalRollingMaxTransform { } }; -template +template struct SeasonalRollingQuantileTransform { + void operator()(const T *data, int n, T *out, int season_length, + int window_size, int min_samples, T p) { + SeasonalRollingTransform(RollingQuantileTransform, data, n, out, + season_length, window_size, min_samples, p); + } +}; + +template inline void RollingUpdate(Func RollingTfm, const T *data, int n, T *out, - int window_size, int min_samples) { + int window_size, int min_samples, Args &&...args) { if (n < min_samples) { *out = std::numeric_limits::quiet_NaN(); return; } int n_samples = std::min(window_size, n); T *buffer = new T[n_samples]; - RollingTfm(data + n - n_samples, n_samples, buffer, window_size, min_samples); + RollingTfm(data + n - n_samples, n_samples, buffer, window_size, min_samples, + std::forward(args)...); *out = buffer[n_samples - 1]; delete[] buffer; } @@ -296,10 +373,18 @@ template struct RollingMaxUpdate { } }; -template +template struct RollingQuantileUpdate { + void operator()(const T *data, int n, T *out, int window_size, + int min_samples, T p) { + RollingUpdate(RollingQuantileTransform, data, n, out, window_size, + min_samples, p); + } +}; + +template inline void SeasonalRollingUpdate(Func RollingUpdate, const T *data, int n, T *out, int season_length, int window_size, - int min_samples) { + int min_samples, Args &&...args) { int season = n % season_length; int season_n = n / season_length + (season > 0); if (season_n < min_samples) { @@ -311,7 +396,8 @@ inline void SeasonalRollingUpdate(Func RollingUpdate, const T *data, int n, for (int i = 0; i < n_samples; ++i) { season_data[i] = data[n - 1 - (n_samples - 1 - i) * season_length]; } - RollingUpdate(season_data, n_samples, out, window_size, min_samples); + RollingUpdate(season_data, n_samples, out, window_size, min_samples, + std::forward(args)...); delete[] season_data; } @@ -347,6 +433,14 @@ template struct SeasonalRollingMaxUpdate { } }; +template struct SeasonalRollingQuantileUpdate { + void operator()(const T *data, int n, T *out, int season_length, + int window_size, int min_samples, T p) { + SeasonalRollingUpdate(RollingQuantileUpdate(), data, n, out, + season_length, window_size, min_samples, p); + } +}; + template inline void ExpandingMeanTransform(const T *data, int n, T *out, T *agg) { T accum = static_cast(0.0); @@ -374,6 +468,19 @@ template struct ExpandingMaxTransform { } }; +template +inline void ExpandingQuantileTransform(const T *data, int n, T *out, T p) { + RollingQuantileTransform(data, n, out, n, 1, p); +} + +template +inline void ExpandingQuantileUpdate(const T *data, int n, T *out, T p) { + T *buffer = new T[n]; + std::copy(data, data + n, buffer); + *out = Quantile(buffer, p, n); + delete[] buffer; +} + template inline void ExponentiallyWeightedMeanTransform(const T *data, int n, T *out, T alpha) { @@ -469,13 +576,13 @@ template class GroupedArray { int GroupedArrayFloat32_Create(const float *data, indptr_t n_data, indptr_t *indptr, indptr_t n_indptr, int num_threads, GroupedArrayHandle *out) { - *out = new GroupedArray(data, n_data, indptr, n_indptr, num_threads); + *out = new GroupedArray(data, n_data, indptr, n_indptr, num_threads); return 0; } int GroupedArrayFloat64_Create(const double *data, indptr_t n_data, indptr_t *indptr, indptr_t n_indptr, int num_threads, GroupedArrayHandle *out) { - *out = new GroupedArray(data, n_data, indptr, n_indptr, num_threads); + *out = new GroupedArray(data, n_data, indptr, n_indptr, num_threads); return 0; } @@ -660,6 +767,25 @@ int GroupedArrayFloat64_RollingMaxTransform(GroupedArrayHandle handle, int lag, return 0; } +int GroupedArrayFloat32_RollingQuantileTransform(GroupedArrayHandle handle, + int lag, float p, + int window_size, + int min_samples, float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(RollingQuantileTransform, lag, out, window_size, + min_samples, p); + return 0; +} +int GroupedArrayFloat64_RollingQuantileTransform(GroupedArrayHandle handle, + int lag, double p, + int window_size, + int min_samples, double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(RollingQuantileTransform, lag, out, window_size, + min_samples, p); + return 0; +} + int GroupedArrayFloat32_RollingMeanUpdate(GroupedArrayHandle handle, int lag, int window_size, int min_samples, float *out) { @@ -721,6 +847,24 @@ int GroupedArrayFloat64_RollingMaxUpdate(GroupedArrayHandle handle, int lag, return 0; } +int GroupedArrayFloat32_RollingQuantileUpdate(GroupedArrayHandle handle, + int lag, float p, int window_size, + int min_samples, float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(RollingQuantileUpdate(), 1, out, lag, window_size, + min_samples, p); + return 0; +} +int GroupedArrayFloat64_RollingQuantileUpdate(GroupedArrayHandle handle, + int lag, double p, + int window_size, int min_samples, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(RollingQuantileUpdate(), 1, out, lag, window_size, + min_samples, p); + return 0; +} + int GroupedArrayFloat32_SeasonalRollingMeanTransform(GroupedArrayHandle handle, int lag, int season_length, int window_size, @@ -805,6 +949,23 @@ int GroupedArrayFloat64_SeasonalRollingMaxTransform(GroupedArrayHandle handle, return 0; } +int GroupedArrayFloat32_SeasonalRollingQuantileTransform( + GroupedArrayHandle handle, int lag, int season_length, float p, + int window_size, int min_samples, float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(SeasonalRollingQuantileTransform(), lag, out, + season_length, window_size, min_samples, p); + return 0; +} +int GroupedArrayFloat64_SeasonalRollingQuantileTransform( + GroupedArrayHandle handle, int lag, int season_length, double p, + int window_size, int min_samples, double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(SeasonalRollingQuantileTransform(), lag, out, + season_length, window_size, min_samples, p); + return 0; +} + int GroupedArrayFloat32_SeasonalRollingMeanUpdate(GroupedArrayHandle handle, int lag, int season_length, int window_size, @@ -890,6 +1051,25 @@ int GroupedArrayFloat64_SeasonalRollingMaxUpdate(GroupedArrayHandle handle, return 0; } +int GroupedArrayFloat32_SeasonalRollingQuantileUpdate( + GroupedArrayHandle handle, int lag, int season_length, float p, + int window_size, int min_samples, float *out) { + + auto ga = reinterpret_cast *>(handle); + ga->Reduce(SeasonalRollingQuantileUpdate(), 1, out, lag, season_length, + window_size, min_samples, p); + return 0; +} +int GroupedArrayFloat64_SeasonalRollingQuantileUpdate( + GroupedArrayHandle handle, int lag, int season_length, double p, + int window_size, int min_samples, double *out) { + + auto ga = reinterpret_cast *>(handle); + ga->Reduce(SeasonalRollingQuantileUpdate(), 1, out, lag, + season_length, window_size, min_samples, p); + return 0; +} + int GroupedArrayFloat32_ExpandingMeanTransform(GroupedArrayHandle handle, int lag, float *out, float *agg) { @@ -947,6 +1127,35 @@ int GroupedArrayFloat64_ExpandingMaxTransform(GroupedArrayHandle handle, return 0; } +int GroupedArrayFloat32_ExpandingQuantileTransform(GroupedArrayHandle handle, + int lag, float p, + float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(ExpandingQuantileTransform, lag, out, p); + return 0; +} +int GroupedArrayFloat64_ExpandingQuantileTransform(GroupedArrayHandle handle, + int lag, double p, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Transform(ExpandingQuantileTransform, lag, out, p); + return 0; +} + +int GroupedArrayFloat32_ExpandingQuantileUpdate(GroupedArrayHandle handle, + int lag, float p, float *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(ExpandingQuantileUpdate, 1, out, lag, p); + return 0; +} +int GroupedArrayFloat64_ExpandingQuantileUpdate(GroupedArrayHandle handle, + int lag, double p, + double *out) { + auto ga = reinterpret_cast *>(handle); + ga->Reduce(ExpandingQuantileUpdate, 1, out, lag, p); + return 0; +} + int GroupedArrayFloat32_ExponentiallyWeightedMeanTransform( GroupedArrayHandle handle, int lag, float alpha, float *out) { auto ga = reinterpret_cast *>(handle); diff --git a/tests/test_lag_transforms.py b/tests/test_lag_transforms.py index 4b246e6..dbfb8b5 100644 --- a/tests/test_lag_transforms.py +++ b/tests/test_lag_transforms.py @@ -1,4 +1,5 @@ import numpy as np +import pandas as pd import pytest from window_ops.expanding import * from window_ops.ewm import ewm_mean @@ -32,6 +33,29 @@ def transform(data, indptr, updates_only, lag, func, *args) -> np.ndarray: return out +def pd_rolling_quantile(x, lag, p, window_size, min_samples): + return ( + pd.Series(x) + .shift(lag) + .rolling(window=window_size, min_periods=min_samples) + .quantile(p) + ) + + +def pd_seasonal_rolling_quantile(x, lag, p, season_length, window_size, min_samples): + out = np.empty_like(x) + x = pd.Series(x).shift(lag).to_numpy() + for season in range(season_length): + out[season::season_length] = pd_rolling_quantile( + x[season::season_length], 0, p, window_size, min_samples + ) + return out + + +def pd_expanding_quantile(x, lag, p): + return pd.Series(x).shift(lag).expanding().quantile(p) + + @pytest.fixture def data(): return 10 * np.random.rand(indptr[-1]) @@ -91,3 +115,37 @@ def test_correctness(data, comb, dtype): wres = transform(data, indptr, True, lag - 1, wf, *args) cres = cobj.update(ga) np.testing.assert_allclose(wres, cres, rtol=rtol) + + +@pytest.mark.parametrize("window_type", ["rolling", "seasonal_rolling", "expanding"]) +@pytest.mark.parametrize("dtype", [np.float32, np.float64]) +@pytest.mark.parametrize("p", [0.01, 0.1, 0.5, 0.9, 0.99]) +def test_correctness_quantiles(data, dtype, p, window_type): + rtol = 1e-5 if dtype == np.float32 else 1e-7 + data = data.astype(dtype, copy=True) + ga = GroupedArray(data, indptr) + if window_type == "rolling": + core_cls = RollingQuantile(lag, p, window_size, min_samples) + pd_fun = pd_rolling_quantile + pd_kwargs = dict(window_size=window_size, min_samples=min_samples) + elif window_type == "seasonal_rolling": + core_cls = SeasonalRollingQuantile( + lag, p, season_length, window_size, min_samples + ) + pd_fun = pd_seasonal_rolling_quantile + pd_kwargs = dict( + season_length=season_length, + window_size=window_size, + min_samples=min_samples, + ) + else: + core_cls = ExpandingQuantile(lag, p) + pd_fun = pd_expanding_quantile + pd_kwargs = {} + cres = core_cls.transform(ga) + core_cls.lag = lag + 1 + cres_upd = core_cls.update(ga) + pres = np.hstack([pd_fun(ga[i], lag, p, **pd_kwargs) for i in range(len(ga))]) + pres_upd = pres[indptr[1:] - 1] + np.testing.assert_allclose(cres, pres, rtol=rtol) + np.testing.assert_allclose(cres_upd, pres_upd, rtol=rtol)