diff --git a/python/xorbits/_mars/dataframe/groupby/__init__.py b/python/xorbits/_mars/dataframe/groupby/__init__.py index 4b76ca6c4..de89be3cf 100644 --- a/python/xorbits/_mars/dataframe/groupby/__init__.py +++ b/python/xorbits/_mars/dataframe/groupby/__init__.py @@ -12,9 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - # noinspection PyUnresolvedReferences from ..core import DataFrameGroupBy, GroupBy, SeriesGroupBy +from .len import groupby_len def _install(): @@ -63,6 +63,7 @@ def _install(): setattr(cls, "sem", lambda groupby, **kw: agg(groupby, "sem", **kw)) setattr(cls, "nunique", lambda groupby, **kw: agg(groupby, "nunique", **kw)) + setattr(cls, "__len__", groupby_len) setattr(cls, "apply", groupby_apply) setattr(cls, "transform", groupby_transform) diff --git a/python/xorbits/_mars/dataframe/groupby/len.py b/python/xorbits/_mars/dataframe/groupby/len.py new file mode 100644 index 000000000..d7733e604 --- /dev/null +++ b/python/xorbits/_mars/dataframe/groupby/len.py @@ -0,0 +1,86 @@ +import pandas as pd + +from ... import opcodes +from ...core import OutputType +from ...core.operand import Operand, OperandStage +from ..operands import DataFrameOperandMixin + + +class GroupByLen(DataFrameOperandMixin, Operand): + _op_type_ = opcodes.GROUPBY_LEN + + def __call__(self, groupby): + return self.new_scalar([groupby]) + + @classmethod + def tile(cls, op: "GroupByLen"): + in_groupby = op.inputs[0] + + # generate map chunks + map_chunks = [] + for chunk in in_groupby.chunks: + map_op = op.copy().reset_key() + map_op.stage = OperandStage.map + map_op.output_types = [OutputType.series] + chunk_inputs = [chunk] + + map_chunks.append(map_op.new_chunk(chunk_inputs)) + + # generate reduce chunks, we only need one reducer here. + out_chunks = [] + reduce_op = op.copy().reset_key() + reduce_op.output_types = [OutputType.scalar] + reduce_op.stage = OperandStage.reduce + + out_chunks.append( + reduce_op.new_chunk(map_chunks, shape=(), index=(0,), dtype=int) + ) + + # final wrap up: + new_op = op.copy() + params = op.outputs[0].params.copy() + + params.pop("shape") + + params["chunks"] = out_chunks + return new_op.new_scalars(op.inputs, **params) + + @classmethod + def execute_map(cls, ctx, op: "GroupByLen"): + chunk = op.outputs[0] + in_df_grouped = ctx[op.inputs[0].key] + + # grouped object .size() method ensure every unique keys + summary = in_df_grouped.size() + sum_indexes = summary.index + + res = [] + for index in sum_indexes: + res.append(index) + + # use series to convey every index store in this level + ctx[chunk.key] = pd.Series(res) + + @classmethod + def execute_reduce(cls, ctx, op: "GroupByLen"): + chunk = op.outputs[0] + res = set() + for input in op.inputs: + key = input.key + input_series = ctx[key] + res.update(input_series) + + res_len = len(res) + ctx[chunk.key] = res_len + + @classmethod + def execute(cls, ctx, op: "GroupByLen"): + if op.stage == OperandStage.map: + cls.execute_map(ctx, op) + elif op.stage == OperandStage.reduce: + cls.execute_reduce(ctx, op) + + +def groupby_len(groupby): + op = GroupByLen() + return op(groupby) diff --git a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py index c67888ddf..3cd6e461c 100644 --- a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py +++ b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py @@ -525,3 +525,41 @@ def test_groupby_fill(): assert len(r.chunks) == 4 assert r.shape == (len(s1),) assert r.chunks[0].shape == (np.nan,) + + +def test_groupby_len_behavior(setup): + df = pd.DataFrame( + [ + [2, 11, 10], + [3, 1, 89], + [6, 1, 51], + [6, 2, 10], + [6, 2, 20], + [3, 2, 35], + [7, 3, 102], + [2, 3, 88], + ], + columns=["one", "two", "three"], + ) + mdf = md.DataFrame(df, chunk_size=3) + + r = tile(mdf.groupby(["two"]).__len__()) + assert r.op.output_types[0] == OutputType.scalar + assert r.shape == () + assert len(r.chunks) == 1 + assert r.chunks[0].shape == () + + r = tile(mdf.groupby(["one", "two"]).__len__()) + assert r.op.output_types[0] == OutputType.scalar + assert r.shape == () + assert len(r.chunks) == 1 + assert r.chunks[0].shape == () + + s1 = pd.Series([4, 3, 9, np.nan, np.nan, 7, 10, 8, 1, 6]) + ms1 = md.Series(s1, chunk_size=3) + + r = tile(ms1.groupby(lambda x: x % 2).__len__()) + assert r.op.output_types[0] == OutputType.scalar + assert r.shape == () + assert len(r.chunks) == 1 + assert r.chunks[0].shape == () diff --git a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py index f2c6b888f..0bbc2f05e 100644 --- a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py +++ b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py @@ -1886,3 +1886,39 @@ def test_series_groupby_rolling_agg(setup, window, min_periods, center, closed, mresult = mresult.execute().fetch() pd.testing.assert_series_equal(presult, mresult.sort_index()) + + +def test_groupby_len(setup): + np.random.seed(42) + num_dataframes = 10 + for i in range(num_dataframes): + # dataframe + data = { + "Category": np.random.choice(["A", "B", "C"], size=100), + "Value": np.random.randint(1, 100, size=100), + } + + # DataFrame test + df_test = pd.DataFrame(data) + df_splitted = md.DataFrame(df_test, chunk_size=35) + + grouped_test = df_test.groupby( + "Category" + ) # this is the original pandas version. + grouped_splitted = df_splitted.groupby("Category") + grouped_test2 = df_test.groupby("Value") + grouped_splitted2 = df_splitted.groupby("Value") + + assert grouped_splitted.__len__().execute().fetch() == len(grouped_test) + assert grouped_splitted2.__len__().execute().fetch() == len(grouped_test2) + + # Series Groupby test: + data2 = np.random.choice(["A", "B", "C"], size=100) + + series = md.Series(data2, chunk_size=35) + series_test = pd.Series(data2) + + grouped_s = series.groupby(series) + grouped_s_test = series_test.groupby(series_test) + + assert grouped_s.__len__().execute().fetch() == len(grouped_s_test) diff --git a/python/xorbits/_mars/opcodes.py b/python/xorbits/_mars/opcodes.py index dc44f1f7f..1a4c8d62e 100644 --- a/python/xorbits/_mars/opcodes.py +++ b/python/xorbits/_mars/opcodes.py @@ -390,6 +390,7 @@ APPLYMAP = 742 PIVOT = 743 PIVOT_TABLE = 744 +LEN = 745 FUSE = 801 @@ -434,6 +435,7 @@ GROUPBY_SORT_REGULAR_SAMPLE = 2037 GROUPBY_SORT_PIVOT = 2038 GROUPBY_SORT_SHUFFLE = 2039 +GROUPBY_LEN = 2064 # parallel sorting by regular sampling PSRS_SORT_REGULAR_SMAPLE = 2040