8000 ENH: Add online operations for EWM.mean by mroeschke · Pull Request #41888 · pandas-dev/pandas · GitHub
[go: up one dir, main page]

Skip to content

ENH: Add online operations for EWM.mean #41888

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Jun 12, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e195c58
Add scaffolding for online EWM
mroeschke May 31, 2021
3d95167
Merge remote-tracking branch 'upstream/master' into online/ewm
mroeschke May 31, 2021
9354bd0
Add online op and new methods and class
mroeschke Jun 7, 2021
0ce197d
Make signatures match
mroeschke Jun 7, 2021
8096cc6
Add some tests, rename some variables
mroeschke Jun 7, 2021
a5273b9
Add newline for readability
mroeschke Jun 7, 2021
bab78cc
Parameterize over adjust and ignore_na
mroeschke Jun 7, 2021
d72a03e
Test resetting in tests
mroeschke Jun 7, 2021
0b7e773
Add test with invalid update
mroeschke Jun 7, 2021
8444b42
Add docstring for mean
mroeschke Jun 7, 2021
7847373
Add docstring for online
mroeschke Jun 7, 2021
df13b55
Parameterize over dataframe and series
mroeschke Jun 7, 2021
57db06e
Generalize axis call for update_times
mroeschke Jun 7, 2021
329dbd2
Remove comments
mroeschke Jun 7, 2021
9594afe
Merge remote-tracking branch 'upstream/master' into online/ewm
mroeschke Jun 8, 2021
28be18a
Add more test and ensure constructions
mroeschke Jun 8, 2021
85025ff
Passing all the non-time tests
mroeschke Jun 8, 2021
3345271
Add whatsnew and window.rst; xfail update_times
mroeschke Jun 9, 2021
2186ea0
Merge remote-tracking branch 'upstream/master' into online/ewm
mroeschke Jun 9, 2021
8024a7b
mypy
mroeschke Jun 9, 2021
80c8b7f
Merge remote-tracking branch 'upstream/master' into online/ewm
mroeschke Jun 9, 2021
8a5b0b9
Address comments
mroeschke Jun 9, 2021
e790947
Fix doctest
mroeschke Jun 9, 2021
916e68b
Merge remote-tracking branch 'upstream/master' into online/ewm
mroeschke Jun 11, 2021
175c4ca
Fix doctest
mroeschke Jun 11, 2021
f799a0f
Merge remote-tracking branch 'upstream/master' into online/ewm
mroeschke Jun 11, 2021
c8b09b6
Merge remote-tracking branch 'upstream/master' into online/ewm
mroeschke Jun 11, 2021
2cb4019
Cannot parallelize a loop
mroeschke Jun 11, 2021
fea8b0b
Trigger CI
mroeschke Jun 11, 2021
04ea064
Merge remote-tracking branch 'upstream/master' into online/ewm
mroeschke Jun 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add online op and new methods and class
  • Loading branch information
mroeschke committed Jun 7, 2021
commit 9354bd0d261580c02459f8ed71f641d2c227c116
10 changes: 6 additions & 4 deletions pandas/core/window/ewm.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,9 @@ def __init__(
times=times,
selection=selection,
)
self._mean = EWMeanState(self._com, self.adjust, self.ignore_na)
self._mean = EWMeanState(
self._com, self.adjust, self.ignore_na, self.axis, obj.shape
)
self.engine = engine
self.engine_kwargs = engine_kwargs

Expand Down Expand Up @@ -747,7 +749,7 @@ def cov(
def var(self, bias: bool = False, *args, **kwargs):
return NotImplementedError

def mean(self, engine=None, engine_kwargs=None, update=None, update_deltas=None):
def mean(self, update=None, update_deltas=None):
if update is not None:
if self._mean.last_ewm is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A user needs to call mean() first then can call mean(update=new_df)

This checks that mean() was called first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, and test for this? (with good error message)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raise ValueError(
Expand All @@ -756,11 +758,11 @@ def mean(self, engine=None, engine_kwargs=None, update=None, update_deltas=None)
obj = np.concatenate(([self._mean.last_ewm], update.to_numpy()))
result_from = 1
else:
obj = self._selected_obj.to_numpy()
obj = self._selected_obj.astype(np.float64).to_numpy()
result_from = 0
if update_deltas is None:
update_deltas = np.ones(max(len(obj) - 1, 0), dtype=np.float64)
ewma_func = generate_online_numba_ewma_func(engine_kwargs)
ewma_func = generate_online_numba_ewma_func(self.engine_kwargs)
result = self._mean.run_ewm(obj, update_deltas, self.min_periods, ewma_func)
result = self._selected_obj._constructor(result)
return result.iloc[result_from:]
73 changes: 36 additions & 37 deletions pandas/core/window/online.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,56 +40,55 @@ def online_ewma(
minimum_periods: int,
old_wt_factor: float,
new_wt: float,
old_wt: float,
old_wt: np.ndarray,
adjust: bool,
ignore_na: bool,
):
result = np.empty(len(values))

result = np.empty(values.shape)
weighted_avg = values[0]
nobs = int(not np.isnan(weighted_avg))
result[0] = weighted_avg if nobs >= minimum_periods else np.nan

for j in range(1, len(values)):
cur = values[j]
is_observation = not np.isnan(cur)
nobs += is_observation
if not np.isnan(weighted_avg):

if is_observation or not ignore_na:

# note that len(deltas) = len(vals) - 1 and deltas[i] is to be
# used in conjunction with vals[i+1]
old_wt *= old_wt_factor ** deltas[j - 1]
if is_observation:

# avoid numerical errors on constant series
if weighted_avg != cur:
weighted_avg = (
(old_wt * weighted_avg) + (new_wt * cur)
) / (old_wt + new_wt)
if adjust:
old_wt += new_wt
else:
old_wt = 1.0
elif is_observation:
weighted_avg = cur

result[j] = weighted_avg if nobs >= minimum_periods else np.nan
nobs = (~np.isnan(weighted_avg)).astype(np.int64)
result[0] = np.where(nobs >= minimum_periods, weighted_avg, np.nan)

for i in range(1, len(values)):
cur = values[i]
is_observations = ~np.isnan(cur)
nobs += is_observations.astype(np.int64)
for j in range(len(cur)):
if not np.isnan(weighted_avg[j]):
if is_observations[j] or not ignore_na:
# note that len(deltas) = len(vals) - 1 and deltas[i] is to be
# used in conjunction with vals[i+1]
old_wt[j] *= old_wt_factor ** deltas[j - 1]
if is_observations[j]:
# avoid numerical errors on constant series
if weighted_avg[j] != cur[j]:
weighted_avg[j] = (
(old_wt[j] * weighted_avg[j]) + (new_wt * cur[j])
) / (old_wt[j] + new_wt)
if adjust:
old_wt[j] += new_wt
else:
old_wt[j] = 1.0
elif is_observations[j]:
weighted_avg[j] = cur[j]

result[i] = np.where(nobs >= minimum_periods, weighted_avg, np.nan)

return result, old_wt

return online_ewma


class EWMeanState:
def __init__(self, com, adjust, ignore_na):
def __init__(self, com, adjust, ignore_na, axis, shape):
alpha = 1.0 / (1.0 + com)
self.old_wt_factor = 1.0 - alpha
self.new_wt = 1.0 if adjust else alpha
self.old_wt = 1.0
self.axis = axis
self.shape = shape
self.adjust = adjust
self.ignore_na = ignore_na
self.new_wt = 1.0 if adjust else alpha
self.old_wt_factor = 1.0 - alpha
self.old_wt = np.ones(self.shape[self.axis - 1])
self.last_ewm = None

def run_ewm(self, weighted_avg, deltas, min_periods, ewm_func):
Expand All @@ -108,5 +107,5 @@ def run_ewm(self, weighted_avg, deltas, min_periods, ewm_func):
return result

def reset(self):
self.old_wt = 1
self.old_wt = np.ones(self.shape[self.axis - 1])
self.last_ewm = None
0