8000 Less blocks in groupby by WillAyd · Pull Request #29753 · pandas-dev/pandas · GitHub
[go: up one dir, main page]

Skip to content

Less blocks in groupby #29753

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Prev Previous commit
Next Next commit
touching internals
  • Loading branch information
WillAyd committed Nov 21, 2019
commit d04a0890ee0be6ee09d1d3c75fc10134327b1b4e
123 changes: 8 additions & 115 deletions pandas/core/groupby/generic.py
Original file line number Diff line num 8000 ber Diff line change
Expand Up @@ -982,122 +982,15 @@ def _iterate_slices(self) -> Iterable[Series]:
def _cython_agg_general(
self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1
):
new_items, new_blocks = self._cython_agg_blocks(
how, alt=alt, numeric_only=numeric_only, min_count=min_count
)
return self._wrap_agged_blocks(new_items, new_blocks)

def _cython_agg_blocks(
self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1
):
# TODO: the actual managing of mgr_locs is a PITA
# here, it should happen via BlockManager.combine

data = self._get_data_to_aggregate()

if numeric_only:
data = data.get_numeric_data(copy=False)

new_blocks = []
new_items = []
deleted_items = []
no_result = object()
for block in data.blocks:
# Avoid inheriting result from earlier in the loop
result = no_result
locs = block.mgr_locs.as_array
try:
result, _ = self.grouper.aggregate(
block.values, how, axis=1, min_count=min_count
)
except NotImplementedError:
# generally if we have numeric_only=False
# and non-applicable functions
# try to python agg

if alt is None:
# we cannot perform the operation
# in an alternate way, exclude the block
assert how == "ohlc"
deleted_items.append(locs)
continue

# call our grouper again with only this block
obj = self.obj[data.items[locs]]
if obj.shape[1] == 1:
# Avoid call to self.values that can occur in DataFrame
# reductions; see GH#28949
obj = obj.iloc[:, 0]

s = get_groupby(obj, self.grouper)
try:
result = s.aggregate(lambda x: alt(x, axis=self.axis))
except TypeError:
# we may have an exception in trying to aggregate
# continue and exclude the block
deleted_items.append(locs)
continue
else:
result = cast(DataFrame, result)
# unwrap DataFrame to get array
assert len(result._data.blocks) == 1
result = result._data.blocks[0].values
if isinstance(result, np.ndarray) and result.ndim == 1:
result = result.reshape(1, -1)

finally:
assert not isinstance(result, DataFrame)

if result is not no_result:
# see if we can cast the block back to the original dtype
result = maybe_downcast_numeric(result, block.dtype)

if block.is_extension and isinstance(result, np.ndarray):
# e.g. block.values was an IntegerArray
# (1, N) case can occur if block.values was Categorical
# and result is ndarray[object]
assert result.ndim == 1 or result.shape[0] == 1
try:
# Cast back if feasible
result = type(block.values)._from_sequence(
result.ravel(), dtype=block.values.dtype
)
except ValueError:
# reshape to be valid for non-Extension Block
result = result.reshape(1, -1)

newb = block.make_block(result)

new_items.append(locs)
new_blocks.append(newb)

if len(new_blocks) == 0:
raise DataError("No numeric types to aggregate")

# reset the locs in the blocks to correspond to our
# current ordering
indexer = np.concatenate(new_items)
new_items = data.items.take(np.sort(indexer))

if len(deleted_items):

# we need to adjust the indexer to account for the
# items we have removed
# really should be done in internals :<

deleted = np.concatenate(deleted_items)
ai = np.arange(len(data))
mask = np.zeros(len(data))
mask[deleted] = 1
indexer = (ai - mask.cumsum())[indexer]

offset = 0
for b in new_blocks:
loc = len(b.mgr_locs)
b.mgr_locs = indexer[offset : (offset + loc)]
offset += loc
func = partial(self.grouper.aggregate, how=how, axis=1, min_count=min_count)
results = self._selected_obj._data.apply(func)
df = DataFrame(results)
if self.as_index:
df.index = self.grouper.result_index
else:
df.index = np.arange(result[0].values.shape[1])

return new_items, new_blocks
return df

def _aggregate_frame(self, func, *args, **kwargs) -> DataFrame:
if self.grouper.nkeys != 1:
Expand Down
21 changes: 16 additions & 5 deletions pandas/core/internals/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,20 +419,31 @@ def apply(
and hasattr(kwargs[k], "values")
}

for b in self.blocks:
for blk in self.blocks:
if filter is not None:
if not b.mgr_locs.isin 8000 (filter_locs).any():
result_blocks.append(b)
if not blk.mgr_locs.isin(filter_locs).any():
result_blocks.append(blk)
continue

if aligned_args:
b_items = self.items[b.mgr_locs.indexer]
b_items = self.items[blk.mgr_locs.indexer]

for k, obj in aligned_args.items():
axis = obj._info_axis_number
kwargs[k] = obj.reindex(b_items, axis=axis, copy=align_copy)

applied = getattr(b, f)(**kwargs)
if isinstance(f, str):
applied = getattr(blk, f)(**kwargs)
else: # partial; specific to groupby
# TODO: func should only return one value; need to remove
# ohlc from groupby semantics to accomplish generically
result, _ = f(blk.values) # better way?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better way?

blk.apply

if result.ndim != 2: # hmm this is hacky
result = result.reshape(-1, 1)

applied = type(blk)(result, placement=blk.mgr_locs, ndim=2)
axes = [self.axes[0], np.arange(result.shape[1])]

result_blocks = _extend_blocks(applied, result_blocks)

if len(result_blocks) == 0:
Expand Down
3 changes: 3 additions & 0 deletions pandas/tests/groupby/test_whitelist.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ def test_regression_whitelist_methods(raw_frame, op, level, axis, skipna, sort):
# GH 17537
# explicitly test the whitelist methods

if op == "median":
pytest.skip("Currently segfaulting...")

if axis == 0:
frame = raw_frame
else:
Expand Down
0