8000 Shuffling fails with Pandas string types · Issue #8011 · dask/dask · GitHub
[go: up one dir, main page]

Skip to content

Shuffling fails with Pandas string types #8011

New issue

Have a question about th 8000 is project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
mrocklin opened this issue Aug 8, 2021 · 2 comments · Fixed by #9566
Closed

Shuffling fails with Pandas string types #8011

mrocklin opened this issue Aug 8, 2021 · 2 comments · Fixed by #9566

Comments

@mrocklin
Copy link
Member
mrocklin commented Aug 8, 2021

If we have a Pandas dataframe with a string dtype (not object) then our quantiles code doesn't know what to do. We should either call astype(object) (good short term fix) or figure out a nicer way of how to handle this.

import dask.dataframe as dd, pandas as pd
df = pd.DataFrame({"x": range(5), "y": "abcde"})
df["y"] = df.y.astype("string")
ddf = dd.from_pandas(df, npartitions=1)
ddf.set_index("y")
TypeError                          Traceback (most recent call last)
<ipython-input-6-b96d10f4863b> in <module>
----> 1 ddf.set_index("y")

~/workspace/dask/dask/dataframe/core.py in set_index(***failed resolving arguments***)
   4226             from .shuffle import set_index
   4227 
-> 4228             return set_index(
   4229                 self,
   4230                 other,

~/workspace/dask/dask/dataframe/shuffle.py in set_index(df, index, npartitions, shuffle, compute, drop, upsample, divisions, partition_size, **kwargs)
    160 
    161     if divisions is None:
--> 162         divisions, mins, maxes = _calculate_divisions(
    163             df, index2, repartition, npartitions, upsample, partition_size
    164         )

~/workspace/dask/dask/dataframe/shuffle.py in _calculate_divisions(df, partition_col, repartition, npartitions, upsample, partition_size)
     33     mins = partition_col.map_partitions(M.min)
     34     maxes = partition_col.map_partitions(M.max)
---> 35     divisions, sizes, mins, maxes = base.compute(divisions, sizes, mins, maxes)
     36     divisions = methods.tolist(divisions)
     37     if type(sizes) is not list:

~/workspace/dask/dask/base.py in compute(*args, **kwargs)
    566         postcomputes.append(x.__dask_postcompute__())
    567 
--> 568     results = schedule(dsk, keys, **kwargs)
    569     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    570 

~/workspace/dask/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     77             pool = MultiprocessingPoolExecutor(pool)
     78 
---> 79     results = get_async(
     80         pool.submit,
     81         pool._max_workers,

~/workspace/dask/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    512                             _execute_task(task, data)  # Re-execute locally
    513                         else:
--> 514                             raise_exception(exc, tb)
    515                     res, worker_id = loads(res_info)
    516                     state["cache"][key] = res

~/workspace/dask/dask/local.py in reraise(exc, tb)
    323     if exc.__traceback__ is not tb:
    324         raise exc.with_traceback(tb)
--> 325     raise exc
    326 
    327 

~/workspace/dask/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    221     try:
    222         task, data = loads(task_info)
--> 223         result = _execute_task(task, data)
    224         id = get_id()
    225         result = dumps((result, id))

~/workspace/dask/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

~/workspace/dask/dask/dataframe/partitionquantiles.py in percentiles_summary(df, num_old, num_new, upsample, state)
    414         data = data.codes
    415         interpolation = "nearest"
--> 416     elif np.issubdtype(data.dtype, np.integer) and not is_cupy_type(data):
    417         # CuPy doesn't currently support "nearest" interpolation,
    418         # so it's special cased in the condition above.

~/mambaforge/lib/python3.9/site-packages/numpy/core/numerictypes.py in issubdtype(arg1, arg2)
    416     """
    417     if not issubclass_(arg1, generic):
--> 418         arg1 = dtype(arg1).type
    419     if not issubclass_(arg2, generic):
    420         arg2 = dtype(arg2).type

TypeError: Cannot interpret 'string[python]' as a data type

cc @TomAugspurger for advice

@TomAugspurger
Copy link
Member

At some point, I think these operations will need to move from using NumPy operations on ndarrays to using methods on the Series / extension array, at least if we want to support arbitrary extension arrays.

Regardless, once we get past the partitionquantiles stage, set_index still won't work since pandas doesn't support extension types like string in the index yet: pandas-dev/pandas#22861.

@jorloplaz
Copy link
Contributor

From Pandas 1.4 onwards, set_index should work. So it should suffice with modifying the partitioning code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants
0