Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for {DataFrame,Series}.mode #3361

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/reference/dataframe/frame.rst
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ Computations / descriptive stats
DataFrame.max
DataFrame.mean
DataFrame.min
DataFrame.mode
DataFrame.nunique
DataFrame.pct_change
DataFrame.prod
Expand Down
1 change: 1 addition & 0 deletions docs/source/reference/dataframe/series.rst
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ Computations / descriptive stats
Series.mean
Series.median
Series.min
Series.mode
Series.pct_change
Series.prod
Series.product
Expand Down
2 changes: 1 addition & 1 deletion mars/dataframe/base/bloom_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def _build_dataframe_filter(cls, in_data: pd.DataFrame, op: "DataFrameBloomFilte
def _convert_to_hashable_dtypes(cls, dtypes: pd.Series):
dtypes = dict(
(name, dtype) if np.issubdtype(dtype, int) else (name, str)
for name, dtype in dtypes.iteritems()
for name, dtype in dtypes.items()
)
return dtypes

Expand Down
2 changes: 2 additions & 0 deletions mars/dataframe/reduction/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def _install():
from .skew import skew_dataframe, skew_series
from .kurtosis import kurt_dataframe, kurt_series
from .reduction_size import size_dataframe, size_series
from .mode import mode_dataframe, mode_series

funcs = [
("sum", sum_series, sum_dataframe),
Expand All @@ -88,6 +89,7 @@ def _install():
("kurtosis", kurt_series, kurt_dataframe),
("unique", unique, None),
("_reduction_size", size_dataframe, size_series),
("mode", mode_series, mode_dataframe),
]
for func_name, series_func, df_func in funcs:
if df_func is not None: # pragma: no branch
Expand Down
141 changes: 97 additions & 44 deletions mars/dataframe/reduction/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def where_function(cond, var1, var2):
"kurt": lambda x, skipna=True, bias=False: x.kurt(skipna=skipna, bias=bias),
"kurtosis": lambda x, skipna=True, bias=False: x.kurtosis(skipna=skipna, bias=bias),
"nunique": lambda x: x.nunique(),
"mode": lambda x, skipna=True: x.mode(skipna=skipna),
}


Expand Down Expand Up @@ -126,53 +127,83 @@ def _calc_result_shape(self, df):

if isinstance(result_df, pd.DataFrame):
self.output_types = [OutputType.dataframe]
return result_df.dtypes, result_df.index
return result_df.dtypes, result_df.index, 2
elif isinstance(result_df, pd.Series):
self.output_types = [OutputType.series]
return pd.Series([result_df.dtype], index=[result_df.name]), result_df.index
return (
pd.Series([result_df.dtype], index=[result_df.name]),
result_df.index,
1,
)
else:
self.output_types = [OutputType.scalar]
return np.array(result_df).dtype, None
return np.array(result_df).dtype, None, 0

def _get_reduced_dim_unit(self, in_ndim, out_ndim):
"""
If rows can be reduced into multiple columns, return nan,
otherwise returns 1
"""
if not isinstance(self.raw_func, str) and isinstance(self.raw_func, Iterable):
return 1
return 1 if in_ndim != out_ndim else np.nan

def __call__(self, df, output_type=None, dtypes=None, index=None):
self._output_types = df.op.output_types
normalize_reduction_funcs(self, ndim=df.ndim)
if output_type is None or dtypes is None:
with enter_mode(kernel=False, build=False):
dtypes, index = self._calc_result_shape(df)
dtypes, index, out_ndim = self._calc_result_shape(df)
else:
self.output_types = [output_type]
if output_type == OutputType.dataframe:
out_ndim = 2
elif output_type == OutputType.series:
out_ndim = 1
else:
out_ndim = 0

reduced_len = self._get_reduced_dim_unit(df.ndim, out_ndim)
if self.output_types[0] == OutputType.dataframe:
if self.axis == 0:
new_shape = (len(index), len(dtypes))
new_index = parse_index(index, store_data=True)
new_shape = (len(index) * reduced_len, len(dtypes))
new_index_value = parse_index(index, store_data=True)
new_dtypes = dtypes
new_col_name = parse_index(dtypes.index, store_data=True)
else:
new_shape = (df.shape[0], len(dtypes))
new_index = df.index_value
new_shape = (df.shape[0], len(dtypes) * reduced_len)
new_index_value = df.index_value
new_dtypes = None if np.isnan(reduced_len) else dtypes
new_col_name = parse_index(
dtypes.index, store_data=not np.isnan(reduced_len)
)
return self.new_dataframe(
[df],
shape=new_shape,
dtypes=dtypes,
index_value=new_index,
columns_value=parse_index(dtypes.index, store_data=True),
dtypes=new_dtypes,
index_value=new_index_value,
columns_value=new_col_name,
)
elif self.output_types[0] == OutputType.series:
if df.ndim == 1:
new_shape = (len(index),)
new_index = parse_index(index, store_data=True)
new_shape = (len(index) * reduced_len,)
new_index_value = parse_index(
index, store_data=not np.isnan(reduced_len)
)
elif self.axis == 0:
new_shape = (len(index),)
new_index = parse_index(index, store_data=True)
new_shape = (len(index) * reduced_len,)
new_index_value = parse_index(
index, store_data=not np.isnan(reduced_len)
)
else:
new_shape = (df.shape[0],)
new_index = df.index_value
new_index_value = df.index_value
return self.new_series(
[df],
shape=new_shape,
dtype=dtypes[0],
name=dtypes.index[0],
index_value=new_index,
index_value=new_index_value,
)
elif self.output_types[0] == OutputType.tensor:
return self.new_tileable([df], dtype=dtypes, shape=(np.nan,))
Expand All @@ -189,7 +220,7 @@ def _safe_append(d, key, val):
@classmethod
def _gen_map_chunks(
cls,
op,
op: "DataFrameAggregate",
in_df,
out_df,
func_infos: List[ReductionSteps],
Expand All @@ -208,6 +239,7 @@ def _gen_map_chunks(

agg_chunks = np.empty(agg_chunks_shape, dtype=object)
dtypes_cache = dict()
reduced_len = op._get_reduced_dim_unit(in_df.ndim, out_df.ndim)
for chunk in in_df.chunks:
input_index = chunk.index[1 - axis] if len(chunk.index) > 1 else 0
if input_index not in input_index_to_output:
Expand All @@ -234,9 +266,9 @@ def _gen_map_chunks(

if map_op.output_types[0] == OutputType.dataframe:
if axis == 0:
shape = (1, out_df.shape[-1])
shape = (reduced_len, chunk.shape[-1])
if out_df.ndim == 2:
columns_value = out_df.columns_value
columns_value = chunk.columns_value
index_value = out_df.index_value
else:
columns_value = out_df.index_value
Expand All @@ -259,11 +291,11 @@ def _gen_map_chunks(
index_value=index_value,
)
else:
shape = (out_df.shape[0], 1)
shape = (chunk.shape[0], reduced_len)
columns_value = parse_index(
pd.Index([0]), out_df.key, store_data=True
)
index_value = out_df.index_value
index_value = chunk.index_value

agg_chunk = map_op.new_chunk(
[chunk],
Expand All @@ -273,7 +305,9 @@ def _gen_map_chunks(
index_value=index_value,
)
else:
agg_chunk = map_op.new_chunk([chunk], shape=(1,), index=new_index)
agg_chunk = map_op.new_chunk(
[chunk], shape=(reduced_len,), index=new_index
)
agg_chunks[agg_chunk.index] = agg_chunk
return agg_chunks

Expand Down Expand Up @@ -409,6 +443,7 @@ def _tile_tree(cls, op: "DataFrameAggregate"):
chunks = cls._gen_map_chunks(
op, in_df, out_df, axis_func_infos, input_index_to_output
)
reduced_len = op._get_reduced_dim_unit(in_df.ndim, out_df.ndim)
while chunks.shape[axis] > combine_size:
if axis == 0:
new_chunks_shape = (
Expand All @@ -429,16 +464,16 @@ def _tile_tree(cls, op: "DataFrameAggregate"):
chks = chunks[i : i + combine_size, idx1]
chunk_index = (idx0, idx1)
if chks[0].ndim == 1:
concat_shape = (len(chks),)
agg_shape = (1,)
concat_shape = (len(chks) * reduced_len,)
agg_shape = (reduced_len,)
else:
concat_shape = (len(chks), chks[0].shape[1])
agg_shape = (chks[0].shape[1], 1)
concat_shape = (len(chks) * reduced_len, chks[0].shape[1])
agg_shape = (chks[0].shape[1], reduced_len)
else:
chks = chunks[idx1, i : i + combine_size]
chunk_index = (idx1, idx0)
concat_shape = (chks[0].shape[0], len(chks))
agg_shape = (chks[0].shape[0], 1)
concat_shape = (chks[0].shape[0], len(chks) * reduced_len)
agg_shape = (chks[0].shape[0], reduced_len)

chks = chks.reshape((chks.shape[0],)).tolist()
if len(chks) == 1:
Expand Down Expand Up @@ -485,12 +520,12 @@ def _tile_tree(cls, op: "DataFrameAggregate"):
if axis == 0:
chks = chunks[:, idx]
if chks[0].ndim == 1:
concat_shape = (len(chks),)
concat_shape = (len(chks) * reduced_len,)
else:
concat_shape = (len(chks), chks[0].shape[1])
concat_shape = (len(chks) * reduced_len, chks[0].shape[1])
else:
chks = chunks[idx, :]
concat_shape = (chks[0].shape[0], len(chks))
concat_shape = (chks[0].shape[0], len(chks) * reduced_len)
chks = chks.reshape((chks.shape[0],)).tolist()
chk = concat_op.new_chunk(
chks,
Expand Down Expand Up @@ -519,7 +554,7 @@ def _tile_tree(cls, op: "DataFrameAggregate"):
shape_len = len(col_index)
kw.update(
dict(
shape=(out_df.shape[0], shape_len),
shape=(out_df.shape[0] * reduced_len, shape_len),
columns_value=columns_value,
index=(0, idx),
dtypes=out_df.dtypes[columns_value.to_pandas()],
Expand All @@ -531,7 +566,10 @@ def _tile_tree(cls, op: "DataFrameAggregate"):
dict(
index=(idx, 0),
index_value=src_col_chunk.index_value,
shape=(src_col_chunk.shape[0], out_df.shape[1]),
shape=(
src_col_chunk.shape[0],
out_df.shape[1] * reduced_len,
),
dtypes=out_df.dtypes,
)
)
Expand Down Expand Up @@ -680,6 +718,8 @@ def _do_predefined_agg(cls, op: "DataFrameAggregate", input_obj, func_name, kwds
raise NotImplementedError("numeric_only not implemented under cudf")
if isinstance(input_obj, pd.Index):
kwds.pop("skipna", None)
if getattr(input_obj, "ndim", 0) > 1:
kwds["axis"] = op.axis
return getattr(input_obj, func_name)(**kwds)

@classmethod
Expand Down Expand Up @@ -830,7 +870,19 @@ def _execute_agg(cls, ctx, op: "DataFrameAggregate"):
ser_index = None
if agg_series_ndim < out.ndim:
ser_index = [func_name]
aggs.append(cls._wrap_df(op, agg_series, index=ser_index))
if (
isinstance(agg_series, np.ndarray)
and getattr(func_inputs[0], "ndim", 0) >= 1
and hasattr(func_inputs[0], "index")
):
agg_series = cls._wrap_df(op, agg_series, index=ser_index)
if op.axis == 0:
agg_series.columns = func_inputs[0].index
else:
agg_series.index = func_inputs[0].index
else:
agg_series = cls._wrap_df(op, agg_series, index=ser_index)
aggs.append(agg_series)

# concatenate to produce final result
concat_df = xdf.concat(aggs, axis=axis)
Expand All @@ -843,25 +895,26 @@ def _execute_agg(cls, ctx, op: "DataFrameAggregate"):
concat_df = concat_df.iloc[:, 0]
else:
concat_df = concat_df.iloc[:, 0]
concat_df.name = op.outputs[0].name
concat_df.name = out.name

concat_df = concat_df.astype(op.outputs[0].dtype, copy=False)
concat_df = concat_df.astype(out.dtype, copy=False)
elif op.output_types[0] == OutputType.scalar:
concat_df = concat_df.iloc[0]
try:
concat_df = concat_df.astype(op.outputs[0].dtype)
concat_df = concat_df.astype(out.dtype)
except AttributeError:
# concat_df may be a string and has no `astype` method
pass
elif op.output_types[0] == OutputType.tensor:
concat_df = xp.array(concat_df).astype(dtype=out.dtype)
else:
if axis == 0:
concat_df = concat_df.reindex(op.outputs[0].index_value.to_pandas())
else:
concat_df = concat_df[op.outputs[0].columns_value.to_pandas()]
if not np.isnan(out.shape[op.axis]):
if axis == 0:
concat_df = concat_df.reindex(out.index_value.to_pandas())
else:
concat_df = concat_df[out.columns_value.to_pandas()]

concat_df = concat_df.astype(op.outputs[0].dtypes, copy=False)
concat_df = concat_df.astype(out.dtypes, copy=False)
ctx[op.outputs[0].key] = concat_df

@classmethod
Expand Down Expand Up @@ -895,7 +948,7 @@ def execute(cls, ctx, op: "DataFrameAggregate"):
):
result = op.func[0](in_data)
else:
result = in_data.agg(op.raw_func, axis=op.axis)
result = in_data.agg(op.raw_func, axis=op.axis, **op.raw_func_kw)
if op.outputs[0].ndim == 1:
result = result.astype(op.outputs[0].dtype, copy=False)

Expand Down
Loading