From 35a6a8a74ef0b4c30b5f77310a179bd833089cdd Mon Sep 17 00:00:00 2001 From: Rui Ji Date: Fri, 9 Jun 2023 16:49:17 +0800 Subject: [PATCH 01/17] fix some minor issues on formatting --- python/xorbits/_mars/services/cluster/gather.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/xorbits/_mars/services/cluster/gather.py b/python/xorbits/_mars/services/cluster/gather.py index 02ed3773c..d6191af8f 100644 --- a/python/xorbits/_mars/services/cluster/gather.py +++ b/python/xorbits/_mars/services/cluster/gather.py @@ -171,7 +171,7 @@ def gather_node_resource(band_to_resource: Dict[str, Resource] = None, use_gpu=T if not num_gpu: continue res[f"gpu-{idx}"] = { - "gpu_avail": 1 - gpu_card_stat.gpu_usage, + "gpu_avail": num_gpu - (gpu_card_stat.gpu_usage / 100.0), "gpu_total": num_gpu, "gpu_memory_avail": gpu_card_stat.fb_mem_info.available, "gpu_memory_total": gpu_card_stat.fb_mem_info.total, From e7c6224297a61c389e86fd34cb4781d2fb9bc6d9 Mon Sep 17 00:00:00 2001 From: Rui Ji Date: Fri, 16 Jun 2023 18:03:39 +0800 Subject: [PATCH 02/17] first try on implement Len Operands --- .../_mars/dataframe/groupby/__init__.py | 3 +- python/xorbits/_mars/dataframe/groupby/len.py | 98 +++++++++++++++++++ .../dataframe/groupby/tests/test_groupby.py | 15 +++ python/xorbits/_mars/opcodes.py | 2 + 4 files changed, 117 insertions(+), 1 deletion(-) create mode 100644 python/xorbits/_mars/dataframe/groupby/len.py diff --git a/python/xorbits/_mars/dataframe/groupby/__init__.py b/python/xorbits/_mars/dataframe/groupby/__init__.py index 4b76ca6c4..de89be3cf 100644 --- a/python/xorbits/_mars/dataframe/groupby/__init__.py +++ b/python/xorbits/_mars/dataframe/groupby/__init__.py @@ -12,9 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - # noinspection PyUnresolvedReferences from ..core import DataFrameGroupBy, GroupBy, SeriesGroupBy +from .len import groupby_len def _install(): @@ -63,6 +63,7 @@ def _install(): setattr(cls, "sem", lambda groupby, **kw: agg(groupby, "sem", **kw)) setattr(cls, "nunique", lambda groupby, **kw: agg(groupby, "nunique", **kw)) + setattr(cls, "__len__", groupby_len) setattr(cls, "apply", groupby_apply) setattr(cls, "transform", groupby_transform) diff --git a/python/xorbits/_mars/dataframe/groupby/len.py b/python/xorbits/_mars/dataframe/groupby/len.py new file mode 100644 index 000000000..6bbe66b17 --- /dev/null +++ b/python/xorbits/_mars/dataframe/groupby/len.py @@ -0,0 +1,98 @@ +import numpy as np +import pandas as pd + +from ... import opcodes +from ...core import OutputType +from ...core.operand import MapReduceOperand, OperandStage +from ..operands import DataFrameOperandMixin, DataFrameShuffleProxy + + +class GroupByLen(DataFrameOperandMixin, MapReduceOperand): + _op_type_ = opcodes.GROUPBY_LEN + + def __call__(self, groupby): + _in_groupby = ( + groupby # the input shall be wrapped-groupby return by the groupby method. + ) + + return self.new_scalar(self, _in_groupby) + + @classmethod + def tile(cls, op: "GroupByLen"): + in_groupby = op.inputs[0] + output_type = OutputType.series + + # generate map chunks + map_chunks = [] + for chunk in in_groupby.chunks: + map_op = op.copy().reset_key() + map_op.stage = OperandStage.map + map_op._shuffle_size = in_groupby.chunk_shape[0] + map_op._output_types = OutputType.series + chunk_inputs = [chunk] + + map_chunks.append(map_op.new_chunk(chunk_inputs)) + + proxy_chunk = DataFrameShuffleProxy(output_types=[output_type]).new_chunk( + map_chunks, shape=() + ) + + # generate reduce chunks, we only need one reducer here. + reduce_chunks = [] + reduce_op = op.copy().reset_key() + reduce_op._output_types = OutputType.scalar + reduce_op.stage = OperandStage.reduce + reduce_chunks.append(reduce_op.new_chunk([proxy_chunk])) + + # generate the result chunk: + out_chunks = [] + for chunk in reduce_chunks: + groupby_len_op = op.copy().reset_key() + out_chunks.append(groupby_len_op.new_chunk([chunk])) + + # final wrap up: + new_op = op.copy() + params = op.outputs[0].params.copy() + params["nsplits"] = ((np.nan,) * len(out_chunks),) + params["chunks"] = out_chunks + return new_op.new_tileables(in_groupby, **params) + + @classmethod + def execute_map(cls, ctx, op): + chunk = op.outputs[0] + in_df_grouped = ctx[op.inputs[0].key] + + # grouped object .size() method ensure every unique keys + summary = in_df_grouped.size() + sum_indexes = summary.index + + res = [] + for index in sum_indexes: + res.append(index) + + # use series to convey every index store in this level + reduce_index = 1 + ctx[chunk.key, reduce_index] = pd.Series(res) + + @classmethod + def execute_reduce(cls, ctx, op: "GroupByLen"): + chunk = op.outputs[0] + input_idx_to_series = dict(op.iter_mapper_data(ctx)) + row_idxes = sorted(input_idx_to_series.keys()) + + res = set() + for row_index in row_idxes: + row_series = input_idx_to_series.get(row_index, None) + res.update(row_series) + + res_len = len(res) + ctx[chunk.key] = res_len + + @classmethod + def execute(cls, ctx, op: "GroupByLen"): + ctx[op.output[0].key] = ctx[op.inputs[0].key] + + +def groupby_len(groupby): + op = GroupByLen() + return op(groupby) diff --git a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py index c67888ddf..be53038e2 100644 --- a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py +++ b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py @@ -525,3 +525,18 @@ def test_groupby_fill(): assert len(r.chunks) == 4 assert r.shape == (len(s1),) assert r.chunks[0].shape == (np.nan,) + + +def test_groupby_len(setup): + df = md.DataFrame( + { + "a": ["a", "b", "a", "c"], + "b": [0.1, 0.2, 0.3, 0.4], + "c": ["aa", "bb", "cc", "aa"], + } + ) + + grouped = df.groupby("b") + + num_groups = len(grouped) + print(num_groups) diff --git a/python/xorbits/_mars/opcodes.py b/python/xorbits/_mars/opcodes.py index dc44f1f7f..1a4c8d62e 100644 --- a/python/xorbits/_mars/opcodes.py +++ b/python/xorbits/_mars/opcodes.py @@ -390,6 +390,7 @@ APPLYMAP = 742 PIVOT = 743 PIVOT_TABLE = 744 +LEN = 745 FUSE = 801 @@ -434,6 +435,7 @@ GROUPBY_SORT_REGULAR_SAMPLE = 2037 GROUPBY_SORT_PIVOT = 2038 GROUPBY_SORT_SHUFFLE = 2039 +GROUPBY_LEN = 2064 # parallel sorting by regular sampling PSRS_SORT_REGULAR_SMAPLE = 2040 From a7819c57f270b02af210768ecfb55545719bbded Mon Sep 17 00:00:00 2001 From: Rui Ji Date: Fri, 16 Jun 2023 18:12:37 +0800 Subject: [PATCH 03/17] first try on implement Len Operands2 --- python/xorbits/_mars/dataframe/groupby/len.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/xorbits/_mars/dataframe/groupby/len.py b/python/xorbits/_mars/dataframe/groupby/len.py index 6bbe66b17..bf971f565 100644 --- a/python/xorbits/_mars/dataframe/groupby/len.py +++ b/python/xorbits/_mars/dataframe/groupby/len.py @@ -90,7 +90,12 @@ def execute_reduce(cls, ctx, op: "GroupByLen"): @classmethod def execute(cls, ctx, op: "GroupByLen"): - ctx[op.output[0].key] = ctx[op.inputs[0].key] + if op.stage == OperandStage.map: + cls.execute_map(ctx, op) + elif op.stage == OperandStage.reduce: + cls.execute_reduce(ctx, op) + else: + ctx[op.output[0].key] = ctx[op.inputs[0].key] def groupby_len(groupby): From 98f0a0811d6fa4c89d26466a3ed1c98d56cdf311 Mon Sep 17 00:00:00 2001 From: Rui Ji Date: Wed, 21 Jun 2023 12:00:49 +0800 Subject: [PATCH 04/17] issues of reducer_index --- python/xorbits/_mars/dataframe/groupby/len.py | 19 +++++++------------ .../groupby/tests/test_groupby_execution.py | 13 +++++++++++++ 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/python/xorbits/_mars/dataframe/groupby/len.py b/python/xorbits/_mars/dataframe/groupby/len.py index bf971f565..b47c71362 100644 --- a/python/xorbits/_mars/dataframe/groupby/len.py +++ b/python/xorbits/_mars/dataframe/groupby/len.py @@ -11,16 +11,12 @@ class GroupByLen(DataFrameOperandMixin, MapReduceOperand): _op_type_ = opcodes.GROUPBY_LEN def __call__(self, groupby): - _in_groupby = ( - groupby # the input shall be wrapped-groupby return by the groupby method. - ) - - return self.new_scalar(self, _in_groupby) + return self.new_scalar([groupby]) @classmethod def tile(cls, op: "GroupByLen"): in_groupby = op.inputs[0] - output_type = OutputType.series + output_type = op.output_types # generate map chunks map_chunks = [] @@ -28,7 +24,7 @@ def tile(cls, op: "GroupByLen"): map_op = op.copy().reset_key() map_op.stage = OperandStage.map map_op._shuffle_size = in_groupby.chunk_shape[0] - map_op._output_types = OutputType.series + map_op._output_types = output_type chunk_inputs = [chunk] map_chunks.append(map_op.new_chunk(chunk_inputs)) @@ -40,7 +36,7 @@ def tile(cls, op: "GroupByLen"): # generate reduce chunks, we only need one reducer here. reduce_chunks = [] reduce_op = op.copy().reset_key() - reduce_op._output_types = OutputType.scalar + reduce_op._output_types = [OutputType.scalar] reduce_op.stage = OperandStage.reduce reduce_chunks.append(reduce_op.new_chunk([proxy_chunk])) @@ -55,7 +51,7 @@ def tile(cls, op: "GroupByLen"): params = op.outputs[0].params.copy() params["nsplits"] = ((np.nan,) * len(out_chunks),) params["chunks"] = out_chunks - return new_op.new_tileables(in_groupby, **params) + return new_op.new_tileables([in_groupby], **params) @classmethod def execute_map(cls, ctx, op): @@ -71,8 +67,7 @@ def execute_map(cls, ctx, op): res.append(index) # use series to convey every index store in this level - reduce_index = 1 - ctx[chunk.key, reduce_index] = pd.Series(res) + ctx[chunk.key, 1] = pd.Series(res) @classmethod def execute_reduce(cls, ctx, op: "GroupByLen"): @@ -100,4 +95,4 @@ def execute(cls, ctx, op: "GroupByLen"): def groupby_len(groupby): op = GroupByLen() - return op(groupby) + return op(groupby).execute().fetch() diff --git a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py index f2c6b888f..462a3cc7d 100644 --- a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py +++ b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py @@ -1886,3 +1886,16 @@ def test_series_groupby_rolling_agg(setup, window, min_periods, center, closed, mresult = mresult.execute().fetch() pd.testing.assert_series_equal(presult, mresult.sort_index()) + + +def test_grouby_len(setup): + df = md.DataFrame( + { + "a": ["a", "b", "a", "c"], + "b": [0.1, 0.2, 0.3, 0.4], + "c": ["aa", "bb", "cc", "aa"], + } + ) + grouped = df.groupby("b") + + print(len(grouped)) From f21a6834983b686706eaf31f70a12728ddeda793 Mon Sep 17 00:00:00 2001 From: Rui Ji Date: Wed, 21 Jun 2023 15:46:56 +0800 Subject: [PATCH 05/17] issues of reducer_index --- python/xorbits/_mars/dataframe/groupby/len.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/xorbits/_mars/dataframe/groupby/len.py b/python/xorbits/_mars/dataframe/groupby/len.py index b47c71362..6fe173e9f 100644 --- a/python/xorbits/_mars/dataframe/groupby/len.py +++ b/python/xorbits/_mars/dataframe/groupby/len.py @@ -27,7 +27,7 @@ def tile(cls, op: "GroupByLen"): map_op._output_types = output_type chunk_inputs = [chunk] - map_chunks.append(map_op.new_chunk(chunk_inputs)) + map_chunks.append(map_op.new_chunk(chunk_inputs, index=chunk.index)) proxy_chunk = DataFrameShuffleProxy(output_types=[output_type]).new_chunk( map_chunks, shape=() @@ -38,7 +38,8 @@ def tile(cls, op: "GroupByLen"): reduce_op = op.copy().reset_key() reduce_op._output_types = [OutputType.scalar] reduce_op.stage = OperandStage.reduce - reduce_chunks.append(reduce_op.new_chunk([proxy_chunk])) + reduce_op.n_reducers = 1 + reduce_chunks.append(reduce_op.new_chunk([proxy_chunk], index=1)) # generate the result chunk: out_chunks = [] From ba93ce7c41eea26bd02255ef4e3b1def05754705 Mon Sep 17 00:00:00 2001 From: UranusSeven <109661872+UranusSeven@users.noreply.github.com> Date: Mon, 26 Jun 2023 11:45:11 +0800 Subject: [PATCH 06/17] Fix tile --- python/xorbits/_mars/dataframe/groupby/len.py | 33 +++++-------------- 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/python/xorbits/_mars/dataframe/groupby/len.py b/python/xorbits/_mars/dataframe/groupby/len.py index 6fe173e9f..12468037a 100644 --- a/python/xorbits/_mars/dataframe/groupby/len.py +++ b/python/xorbits/_mars/dataframe/groupby/len.py @@ -3,11 +3,11 @@ from ... import opcodes from ...core import OutputType -from ...core.operand import MapReduceOperand, OperandStage -from ..operands import DataFrameOperandMixin, DataFrameShuffleProxy +from ...core.operand import Operand, OperandStage +from ..operands import DataFrameOperandMixin -class GroupByLen(DataFrameOperandMixin, MapReduceOperand): +class GroupByLen(DataFrameOperandMixin, Operand): _op_type_ = opcodes.GROUPBY_LEN def __call__(self, groupby): @@ -16,43 +16,30 @@ def __call__(self, groupby): @classmethod def tile(cls, op: "GroupByLen"): in_groupby = op.inputs[0] - output_type = op.output_types # generate map chunks map_chunks = [] for chunk in in_groupby.chunks: map_op = op.copy().reset_key() map_op.stage = OperandStage.map - map_op._shuffle_size = in_groupby.chunk_shape[0] - map_op._output_types = output_type + map_op.output_types = [OutputType.series] chunk_inputs = [chunk] - map_chunks.append(map_op.new_chunk(chunk_inputs, index=chunk.index)) - - proxy_chunk = DataFrameShuffleProxy(output_types=[output_type]).new_chunk( - map_chunks, shape=() - ) + map_chunks.append(map_op.new_chunk(chunk_inputs)) # generate reduce chunks, we only need one reducer here. - reduce_chunks = [] + out_chunks = [] reduce_op = op.copy().reset_key() - reduce_op._output_types = [OutputType.scalar] + reduce_op.output_types = [OutputType.scalar] reduce_op.stage = OperandStage.reduce - reduce_op.n_reducers = 1 - reduce_chunks.append(reduce_op.new_chunk([proxy_chunk], index=1)) - - # generate the result chunk: - out_chunks = [] - for chunk in reduce_chunks: - groupby_len_op = op.copy().reset_key() - out_chunks.append(groupby_len_op.new_chunk([chunk])) + out_chunks.append(reduce_op.new_chunk(map_chunks)) # final wrap up: new_op = op.copy() params = op.outputs[0].params.copy() params["nsplits"] = ((np.nan,) * len(out_chunks),) params["chunks"] = out_chunks - return new_op.new_tileables([in_groupby], **params) + return new_op.new_scalar(op.inputs, **params) @classmethod def execute_map(cls, ctx, op): @@ -90,8 +77,6 @@ def execute(cls, ctx, op: "GroupByLen"): cls.execute_map(ctx, op) elif op.stage == OperandStage.reduce: cls.execute_reduce(ctx, op) - else: - ctx[op.output[0].key] = ctx[op.inputs[0].key] def groupby_len(groupby): From 5dbbbb086be1b21623a9ace0cd9186585abcb2ff Mon Sep 17 00:00:00 2001 From: Rui Ji Date: Wed, 28 Jun 2023 15:34:29 +0800 Subject: [PATCH 07/17] len method implemented with random test passed --- python/xorbits/_mars/dataframe/groupby/len.py | 22 +++++++++------- .../dataframe/groupby/tests/test_groupby.py | 26 ++++++++++++------- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/python/xorbits/_mars/dataframe/groupby/len.py b/python/xorbits/_mars/dataframe/groupby/len.py index 12468037a..b38e77503 100644 --- a/python/xorbits/_mars/dataframe/groupby/len.py +++ b/python/xorbits/_mars/dataframe/groupby/len.py @@ -1,4 +1,3 @@ -import numpy as np import pandas as pd from ... import opcodes @@ -32,17 +31,22 @@ def tile(cls, op: "GroupByLen"): reduce_op = op.copy().reset_key() reduce_op.output_types = [OutputType.scalar] reduce_op.stage = OperandStage.reduce - out_chunks.append(reduce_op.new_chunk(map_chunks)) + params = dict(dtype=int) + out_chunks.append( + reduce_op.new_chunk(map_chunks, shape=(), index=(0,), dtype=int) + ) # final wrap up: new_op = op.copy() params = op.outputs[0].params.copy() - params["nsplits"] = ((np.nan,) * len(out_chunks),) + + params.pop("shape") + params["chunks"] = out_chunks - return new_op.new_scalar(op.inputs, **params) + return new_op.new_scalars(op.inputs, **params) @classmethod - def execute_map(cls, ctx, op): + def execute_map(cls, ctx, op: "GroupByLen"): chunk = op.outputs[0] in_df_grouped = ctx[op.inputs[0].key] @@ -60,13 +64,11 @@ def execute_map(cls, ctx, op): @classmethod def execute_reduce(cls, ctx, op: "GroupByLen"): chunk = op.outputs[0] - input_idx_to_series = dict(op.iter_mapper_data(ctx)) - row_idxes = sorted(input_idx_to_series.keys()) + key = op.inputs[0].key res = set() - for row_index in row_idxes: - row_series = input_idx_to_series.get(row_index, None) - res.update(row_series) + input_series = ctx[key, 1] + res.update(input_series) res_len = len(res) ctx[chunk.key] = res_len diff --git a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py index be53038e2..6f7ab1d7b 100644 --- a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py +++ b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py @@ -528,15 +528,23 @@ def test_groupby_fill(): def test_groupby_len(setup): - df = md.DataFrame( - { - "a": ["a", "b", "a", "c"], - "b": [0.1, 0.2, 0.3, 0.4], - "c": ["aa", "bb", "cc", "aa"], + np.random.seed(42) + num_dataframes = 10 + for i in range(num_dataframes): + # Generate random data + data = { + "Category": np.random.choice(["A", "B", "C"], size=100), + "Value": np.random.randint(1, 100, size=100), } - ) - grouped = df.groupby("b") + # Create DataFrame + df = md.DataFrame(data) + df_test = pd.DataFrame(data) + + grouped = df.groupby("Category") + grouped_test = df_test.groupby("Category") - num_groups = len(grouped) - print(num_groups) + grouped2 = df.groupby("Value") + grouped_test2 = df_test.groupby("Value") + assert len(grouped) == len(grouped_test) + assert len(grouped2) == len(grouped_test2) From 4f9106f9eed5ccee4605a7503126a46e2d7174f0 Mon Sep 17 00:00:00 2001 From: Rui Ji Date: Thu, 29 Jun 2023 12:40:17 +0800 Subject: [PATCH 08/17] try to solve chunk_size issues --- python/xorbits/_mars/dataframe/groupby/len.py | 6 +- .../dataframe/groupby/tests/test_groupby.py | 31 ++++------ .../groupby/tests/test_groupby_execution.py | 57 ++++++++++++++++--- 3 files changed, 62 insertions(+), 32 deletions(-) diff --git a/python/xorbits/_mars/dataframe/groupby/len.py b/python/xorbits/_mars/dataframe/groupby/len.py index b38e77503..7981378f2 100644 --- a/python/xorbits/_mars/dataframe/groupby/len.py +++ b/python/xorbits/_mars/dataframe/groupby/len.py @@ -31,7 +31,7 @@ def tile(cls, op: "GroupByLen"): reduce_op = op.copy().reset_key() reduce_op.output_types = [OutputType.scalar] reduce_op.stage = OperandStage.reduce - params = dict(dtype=int) + out_chunks.append( reduce_op.new_chunk(map_chunks, shape=(), index=(0,), dtype=int) ) @@ -59,7 +59,7 @@ def execute_map(cls, ctx, op: "GroupByLen"): res.append(index) # use series to convey every index store in this level - ctx[chunk.key, 1] = pd.Series(res) + ctx[chunk.key] = pd.Series(res) @classmethod def execute_reduce(cls, ctx, op: "GroupByLen"): @@ -67,7 +67,7 @@ def execute_reduce(cls, ctx, op: "GroupByLen"): key = op.inputs[0].key res = set() - input_series = ctx[key, 1] + input_series = ctx[key] res.update(input_series) res_len = len(res) diff --git a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py index 6f7ab1d7b..09df9b2e3 100644 --- a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py +++ b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py @@ -527,24 +527,13 @@ def test_groupby_fill(): assert r.chunks[0].shape == (np.nan,) -def test_groupby_len(setup): - np.random.seed(42) - num_dataframes = 10 - for i in range(num_dataframes): - # Generate random data - data = { - "Category": np.random.choice(["A", "B", "C"], size=100), - "Value": np.random.randint(1, 100, size=100), - } - - # Create DataFrame - df = md.DataFrame(data) - df_test = pd.DataFrame(data) - - grouped = df.groupby("Category") - grouped_test = df_test.groupby("Category") - - grouped2 = df.groupby("Value") - grouped_test2 = df_test.groupby("Value") - assert len(grouped) == len(grouped_test) - assert len(grouped2) == len(grouped_test2) +# def test_groupby_len_behavior(setup): +# np.random.seed(42) +# +# num_rows = 10 +# num_columns = 4 +# +# data = np.random.randint(1, 100, size=(num_rows, num_columns)) +# columns = ["Column1", "Column2", "Column3", "Column4"] +# +# df = pd.DataFrame(data, columns=columns) diff --git a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py index 462a3cc7d..9029d8241 100644 --- a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py +++ b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py @@ -1888,14 +1888,55 @@ def test_series_groupby_rolling_agg(setup, window, min_periods, center, closed, pd.testing.assert_series_equal(presult, mresult.sort_index()) -def test_grouby_len(setup): - df = md.DataFrame( - { - "a": ["a", "b", "a", "c"], - "b": [0.1, 0.2, 0.3, 0.4], - "c": ["aa", "bb", "cc", "aa"], +def test_groupby_len(setup): + np.random.seed(42) + num_dataframes = 10 + for i in range(num_dataframes): + # dataframe + data = { + "Category": np.random.choice(["A", "B", "C"], size=100), + "Value": np.random.randint(1, 100, size=100), } - ) - grouped = df.groupby("b") + # DataFrame test + df_test = pd.DataFrame(data) + df = md.DataFrame(df_test) + df_splitted = md.DataFrame(df_test, chunk_size=3) + + grouped = df.groupby("Category") + grouped_test = df_test.groupby( + "Category" + ) # this is the original pandas version. + grouped_splitted = df_splitted.groupby("Category") + + grouped2 = df.groupby("Value") + grouped_test2 = df_test.groupby("Value") + grouped_splitted2 = df_splitted.groupby("Value") + assert len(grouped) == len(grouped_test) + assert len(grouped_splitted) == len(grouped_test) + assert len(grouped2) == len(grouped_test2) + assert len(grouped_splitted2) == len(grouped_test2) + + # Series Groupby test: + data2 = np.random.choice(["A", "B", "C"], size=100) + + series = md.Series(data2) + series_test = pd.Series(data2) + + grouped_s = series.groupby(series) + grouped_s_test = series_test.groupby(series_test) + + assert len(grouped_s) == len(grouped_s_test) + + +def test_temp(setup): + data = { + "Category": np.random.choice(["A", "B", "C"], size=100), + "Value": np.random.randint(1, 100, size=100), + } + + # DataFrame test + df_test = pd.DataFrame(data) + df = md.DataFrame(df_test, chunk_size=3) + grouped = df.groupby("Category") print(len(grouped)) From 59fc7016b8b0aa7a1de7f2ce638a841dbd5bc1e9 Mon Sep 17 00:00:00 2001 From: Rui Ji Date: Thu, 29 Jun 2023 15:57:27 +0800 Subject: [PATCH 09/17] multiple chunks(chunk_size) implemented with UT and IT passed --- python/xorbits/_mars/dataframe/groupby/len.py | 10 ++-- .../dataframe/groupby/tests/test_groupby.py | 46 +++++++++++++++---- .../groupby/tests/test_groupby_execution.py | 30 +++--------- 3 files changed, 47 insertions(+), 39 deletions(-) diff --git a/python/xorbits/_mars/dataframe/groupby/len.py b/python/xorbits/_mars/dataframe/groupby/len.py index 7981378f2..d7733e604 100644 --- a/python/xorbits/_mars/dataframe/groupby/len.py +++ b/python/xorbits/_mars/dataframe/groupby/len.py @@ -64,11 +64,11 @@ def execute_map(cls, ctx, op: "GroupByLen"): @classmethod def execute_reduce(cls, ctx, op: "GroupByLen"): chunk = op.outputs[0] - key = op.inputs[0].key - res = set() - input_series = ctx[key] - res.update(input_series) + for input in op.inputs: + key = input.key + input_series = ctx[key] + res.update(input_series) res_len = len(res) ctx[chunk.key] = res_len @@ -83,4 +83,4 @@ def execute(cls, ctx, op: "GroupByLen"): def groupby_len(groupby): op = GroupByLen() - return op(groupby).execute().fetch() + return op(groupby) diff --git a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py index 09df9b2e3..3cd6e461c 100644 --- a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py +++ b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py @@ -527,13 +527,39 @@ def test_groupby_fill(): assert r.chunks[0].shape == (np.nan,) -# def test_groupby_len_behavior(setup): -# np.random.seed(42) -# -# num_rows = 10 -# num_columns = 4 -# -# data = np.random.randint(1, 100, size=(num_rows, num_columns)) -# columns = ["Column1", "Column2", "Column3", "Column4"] -# -# df = pd.DataFrame(data, columns=columns) +def test_groupby_len_behavior(setup): + df = pd.DataFrame( + [ + [2, 11, 10], + [3, 1, 89], + [6, 1, 51], + [6, 2, 10], + [6, 2, 20], + [3, 2, 35], + [7, 3, 102], + [2, 3, 88], + ], + columns=["one", "two", "three"], + ) + mdf = md.DataFrame(df, chunk_size=3) + + r = tile(mdf.groupby(["two"]).__len__()) + assert r.op.output_types[0] == OutputType.scalar + assert r.shape == () + assert len(r.chunks) == 1 + assert r.chunks[0].shape == () + + r = tile(mdf.groupby(["one", "two"]).__len__()) + assert r.op.output_types[0] == OutputType.scalar + assert r.shape == () + assert len(r.chunks) == 1 + assert r.chunks[0].shape == () + + s1 = pd.Series([4, 3, 9, np.nan, np.nan, 7, 10, 8, 1, 6]) + ms1 = md.Series(s1, chunk_size=3) + + r = tile(ms1.groupby(lambda x: x % 2).__len__()) + assert r.op.output_types[0] == OutputType.scalar + assert r.shape == () + assert len(r.chunks) == 1 + assert r.chunks[0].shape == () diff --git a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py index 9029d8241..0bbc2f05e 100644 --- a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py +++ b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py @@ -1900,43 +1900,25 @@ def test_groupby_len(setup): # DataFrame test df_test = pd.DataFrame(data) - df = md.DataFrame(df_test) - df_splitted = md.DataFrame(df_test, chunk_size=3) + df_splitted = md.DataFrame(df_test, chunk_size=35) - grouped = df.groupby("Category") grouped_test = df_test.groupby( "Category" ) # this is the original pandas version. grouped_splitted = df_splitted.groupby("Category") - - grouped2 = df.groupby("Value") grouped_test2 = df_test.groupby("Value") grouped_splitted2 = df_splitted.groupby("Value") - assert len(grouped) == len(grouped_test) - assert len(grouped_splitted) == len(grouped_test) - assert len(grouped2) == len(grouped_test2) - assert len(grouped_splitted2) == len(grouped_test2) + + assert grouped_splitted.__len__().execute().fetch() == len(grouped_test) + assert grouped_splitted2.__len__().execute().fetch() == len(grouped_test2) # Series Groupby test: data2 = np.random.choice(["A", "B", "C"], size=100) - series = md.Series(data2) + series = md.Series(data2, chunk_size=35) series_test = pd.Series(data2) grouped_s = series.groupby(series) grouped_s_test = series_test.groupby(series_test) - assert len(grouped_s) == len(grouped_s_test) - - -def test_temp(setup): - data = { - "Category": np.random.choice(["A", "B", "C"], size=100), - "Value": np.random.randint(1, 100, size=100), - } - - # DataFrame test - df_test = pd.DataFrame(data) - df = md.DataFrame(df_test, chunk_size=3) - grouped = df.groupby("Category") - print(len(grouped)) + assert grouped_s.__len__().execute().fetch() == len(grouped_s_test) From b899fe05d4234a4c4463e8df8aae902adf594977 Mon Sep 17 00:00:00 2001 From: Rui Ji Date: Fri, 16 Jun 2023 18:03:39 +0800 Subject: [PATCH 10/17] first try on implement Len Operands --- .../_mars/dataframe/groupby/__init__.py | 3 +- python/xorbits/_mars/dataframe/groupby/len.py | 98 +++++++++++++++++++ .../dataframe/groupby/tests/test_groupby.py | 15 +++ python/xorbits/_mars/opcodes.py | 2 + 4 files changed, 117 insertions(+), 1 deletion(-) create mode 100644 python/xorbits/_mars/dataframe/groupby/len.py diff --git a/python/xorbits/_mars/dataframe/groupby/__init__.py b/python/xorbits/_mars/dataframe/groupby/__init__.py index 4b76ca6c4..de89be3cf 100644 --- a/python/xorbits/_mars/dataframe/groupby/__init__.py +++ b/python/xorbits/_mars/dataframe/groupby/__init__.py @@ -12,9 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - # noinspection PyUnresolvedReferences from ..core import DataFrameGroupBy, GroupBy, SeriesGroupBy +from .len import groupby_len def _install(): @@ -63,6 +63,7 @@ def _install(): setattr(cls, "sem", lambda groupby, **kw: agg(groupby, "sem", **kw)) setattr(cls, "nunique", lambda groupby, **kw: agg(groupby, "nunique", **kw)) + setattr(cls, "__len__", groupby_len) setattr(cls, "apply", groupby_apply) setattr(cls, "transform", groupby_transform) diff --git a/python/xorbits/_mars/dataframe/groupby/len.py b/python/xorbits/_mars/dataframe/groupby/len.py new file mode 100644 index 000000000..6bbe66b17 --- /dev/null +++ b/python/xorbits/_mars/dataframe/groupby/len.py @@ -0,0 +1,98 @@ +import numpy as np +import pandas as pd + +from ... import opcodes +from ...core import OutputType +from ...core.operand import MapReduceOperand, OperandStage +from ..operands import DataFrameOperandMixin, DataFrameShuffleProxy + + +class GroupByLen(DataFrameOperandMixin, MapReduceOperand): + _op_type_ = opcodes.GROUPBY_LEN + + def __call__(self, groupby): + _in_groupby = ( + groupby # the input shall be wrapped-groupby return by the groupby method. + ) + + return self.new_scalar(self, _in_groupby) + + @classmethod + def tile(cls, op: "GroupByLen"): + in_groupby = op.inputs[0] + output_type = OutputType.series + + # generate map chunks + map_chunks = [] + for chunk in in_groupby.chunks: + map_op = op.copy().reset_key() + map_op.stage = OperandStage.map + map_op._shuffle_size = in_groupby.chunk_shape[0] + map_op._output_types = OutputType.series + chunk_inputs = [chunk] + + map_chunks.append(map_op.new_chunk(chunk_inputs)) + + proxy_chunk = DataFrameShuffleProxy(output_types=[output_type]).new_chunk( + map_chunks, shape=() + ) + + # generate reduce chunks, we only need one reducer here. + reduce_chunks = [] + reduce_op = op.copy().reset_key() + reduce_op._output_types = OutputType.scalar + reduce_op.stage = OperandStage.reduce + reduce_chunks.append(reduce_op.new_chunk([proxy_chunk])) + + # generate the result chunk: + out_chunks = [] + for chunk in reduce_chunks: + groupby_len_op = op.copy().reset_key() + out_chunks.append(groupby_len_op.new_chunk([chunk])) + + # final wrap up: + new_op = op.copy() + params = op.outputs[0].params.copy() + params["nsplits"] = ((np.nan,) * len(out_chunks),) + params["chunks"] = out_chunks + return new_op.new_tileables(in_groupby, **params) + + @classmethod + def execute_map(cls, ctx, op): + chunk = op.outputs[0] + in_df_grouped = ctx[op.inputs[0].key] + + # grouped object .size() method ensure every unique keys + summary = in_df_grouped.size() + sum_indexes = summary.index + + res = [] + for index in sum_indexes: + res.append(index) + + # use series to convey every index store in this level + reduce_index = 1 + ctx[chunk.key, reduce_index] = pd.Series(res) + + @classmethod + def execute_reduce(cls, ctx, op: "GroupByLen"): + chunk = op.outputs[0] + input_idx_to_series = dict(op.iter_mapper_data(ctx)) + row_idxes = sorted(input_idx_to_series.keys()) + + res = set() + for row_index in row_idxes: + row_series = input_idx_to_series.get(row_index, None) + res.update(row_series) + + res_len = len(res) + ctx[chunk.key] = res_len + + @classmethod + def execute(cls, ctx, op: "GroupByLen"): + ctx[op.output[0].key] = ctx[op.inputs[0].key] + + +def groupby_len(groupby): + op = GroupByLen() + return op(groupby) diff --git a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py index c67888ddf..be53038e2 100644 --- a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py +++ b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py @@ -525,3 +525,18 @@ def test_groupby_fill(): assert len(r.chunks) == 4 assert r.shape == (len(s1),) assert r.chunks[0].shape == (np.nan,) + + +def test_groupby_len(setup): + df = md.DataFrame( + { + "a": ["a", "b", "a", "c"], + "b": [0.1, 0.2, 0.3, 0.4], + "c": ["aa", "bb", "cc", "aa"], + } + ) + + grouped = df.groupby("b") + + num_groups = len(grouped) + print(num_groups) diff --git a/python/xorbits/_mars/opcodes.py b/python/xorbits/_mars/opcodes.py index dc44f1f7f..1a4c8d62e 100644 --- a/python/xorbits/_mars/opcodes.py +++ b/python/xorbits/_mars/opcodes.py @@ -390,6 +390,7 @@ APPLYMAP = 742 PIVOT = 743 PIVOT_TABLE = 744 +LEN = 745 FUSE = 801 @@ -434,6 +435,7 @@ GROUPBY_SORT_REGULAR_SAMPLE = 2037 GROUPBY_SORT_PIVOT = 2038 GROUPBY_SORT_SHUFFLE = 2039 +GROUPBY_LEN = 2064 # parallel sorting by regular sampling PSRS_SORT_REGULAR_SMAPLE = 2040 From a39c029187aff1daebdb44cb4213dbcfcd5a99d9 Mon Sep 17 00:00:00 2001 From: Rui Ji Date: Fri, 16 Jun 2023 18:12:37 +0800 Subject: [PATCH 11/17] first try on implement Len Operands2 --- python/xorbits/_mars/dataframe/groupby/len.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/xorbits/_mars/dataframe/groupby/len.py b/python/xorbits/_mars/dataframe/groupby/len.py index 6bbe66b17..bf971f565 100644 --- a/python/xorbits/_mars/dataframe/groupby/len.py +++ b/python/xorbits/_mars/dataframe/groupby/len.py @@ -90,7 +90,12 @@ def execute_reduce(cls, ctx, op: "GroupByLen"): @classmethod def execute(cls, ctx, op: "GroupByLen"): - ctx[op.output[0].key] = ctx[op.inputs[0].key] + if op.stage == OperandStage.map: + cls.execute_map(ctx, op) + elif op.stage == OperandStage.reduce: + cls.execute_reduce(ctx, op) + else: + ctx[op.output[0].key] = ctx[op.inputs[0].key] def groupby_len(groupby): From 9710c4c2b7413fffd4bf372813b2b629a2fd09ff Mon Sep 17 00:00:00 2001 From: Rui Ji Date: Wed, 21 Jun 2023 12:00:49 +0800 Subject: [PATCH 12/17] issues of reducer_index --- python/xorbits/_mars/dataframe/groupby/len.py | 19 +++++++------------ .../groupby/tests/test_groupby_execution.py | 13 +++++++++++++ 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/python/xorbits/_mars/dataframe/groupby/len.py b/python/xorbits/_mars/dataframe/groupby/len.py index bf971f565..b47c71362 100644 --- a/python/xorbits/_mars/dataframe/groupby/len.py +++ b/python/xorbits/_mars/dataframe/groupby/len.py @@ -11,16 +11,12 @@ class GroupByLen(DataFrameOperandMixin, MapReduceOperand): _op_type_ = opcodes.GROUPBY_LEN def __call__(self, groupby): - _in_groupby = ( - groupby # the input shall be wrapped-groupby return by the groupby method. - ) - - return self.new_scalar(self, _in_groupby) + return self.new_scalar([groupby]) @classmethod def tile(cls, op: "GroupByLen"): in_groupby = op.inputs[0] - output_type = OutputType.series + output_type = op.output_types # generate map chunks map_chunks = [] @@ -28,7 +24,7 @@ def tile(cls, op: "GroupByLen"): map_op = op.copy().reset_key() map_op.stage = OperandStage.map map_op._shuffle_size = in_groupby.chunk_shape[0] - map_op._output_types = OutputType.series + map_op._output_types = output_type chunk_inputs = [chunk] map_chunks.append(map_op.new_chunk(chunk_inputs)) @@ -40,7 +36,7 @@ def tile(cls, op: "GroupByLen"): # generate reduce chunks, we only need one reducer here. reduce_chunks = [] reduce_op = op.copy().reset_key() - reduce_op._output_types = OutputType.scalar + reduce_op._output_types = [OutputType.scalar] reduce_op.stage = OperandStage.reduce reduce_chunks.append(reduce_op.new_chunk([proxy_chunk])) @@ -55,7 +51,7 @@ def tile(cls, op: "GroupByLen"): params = op.outputs[0].params.copy() params["nsplits"] = ((np.nan,) * len(out_chunks),) params["chunks"] = out_chunks - return new_op.new_tileables(in_groupby, **params) + return new_op.new_tileables([in_groupby], **params) @classmethod def execute_map(cls, ctx, op): @@ -71,8 +67,7 @@ def execute_map(cls, ctx, op): res.append(index) # use series to convey every index store in this level - reduce_index = 1 - ctx[chunk.key, reduce_index] = pd.Series(res) + ctx[chunk.key, 1] = pd.Series(res) @classmethod def execute_reduce(cls, ctx, op: "GroupByLen"): @@ -100,4 +95,4 @@ def execute(cls, ctx, op: "GroupByLen"): def groupby_len(groupby): op = GroupByLen() - return op(groupby) + return op(groupby).execute().fetch() diff --git a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py index f2c6b888f..462a3cc7d 100644 --- a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py +++ b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py @@ -1886,3 +1886,16 @@ def test_series_groupby_rolling_agg(setup, window, min_periods, center, closed, mresult = mresult.execute().fetch() pd.testing.assert_series_equal(presult, mresult.sort_index()) + + +def test_grouby_len(setup): + df = md.DataFrame( + { + "a": ["a", "b", "a", "c"], + "b": [0.1, 0.2, 0.3, 0.4], + "c": ["aa", "bb", "cc", "aa"], + } + ) + grouped = df.groupby("b") + + print(len(grouped)) From 472d03cef239717204275475407ffcd71cdc6f1b Mon Sep 17 00:00:00 2001 From: Rui Ji Date: Wed, 21 Jun 2023 15:46:56 +0800 Subject: [PATCH 13/17] issues of reducer_index --- python/xorbits/_mars/dataframe/groupby/len.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/xorbits/_mars/dataframe/groupby/len.py b/python/xorbits/_mars/dataframe/groupby/len.py index b47c71362..6fe173e9f 100644 --- a/python/xorbits/_mars/dataframe/groupby/len.py +++ b/python/xorbits/_mars/dataframe/groupby/len.py @@ -27,7 +27,7 @@ def tile(cls, op: "GroupByLen"): map_op._output_types = output_type chunk_inputs = [chunk] - map_chunks.append(map_op.new_chunk(chunk_inputs)) + map_chunks.append(map_op.new_chunk(chunk_inputs, index=chunk.index)) proxy_chunk = DataFrameShuffleProxy(output_types=[output_type]).new_chunk( map_chunks, shape=() @@ -38,7 +38,8 @@ def tile(cls, op: "GroupByLen"): reduce_op = op.copy().reset_key() reduce_op._output_types = [OutputType.scalar] reduce_op.stage = OperandStage.reduce - reduce_chunks.append(reduce_op.new_chunk([proxy_chunk])) + reduce_op.n_reducers = 1 + reduce_chunks.append(reduce_op.new_chunk([proxy_chunk], index=1)) # generate the result chunk: out_chunks = [] From b0c3f94c42c0bb950612fa3d19f383e17fc7a2e7 Mon Sep 17 00:00:00 2001 From: UranusSeven <109661872+UranusSeven@users.noreply.github.com> Date: Mon, 26 Jun 2023 11:45:11 +0800 Subject: [PATCH 14/17] Fix tile --- python/xorbits/_mars/dataframe/groupby/len.py | 33 +++++-------------- 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/python/xorbits/_mars/dataframe/groupby/len.py b/python/xorbits/_mars/dataframe/groupby/len.py index 6fe173e9f..12468037a 100644 --- a/python/xorbits/_mars/dataframe/groupby/len.py +++ b/python/xorbits/_mars/dataframe/groupby/len.py @@ -3,11 +3,11 @@ from ... import opcodes from ...core import OutputType -from ...core.operand import MapReduceOperand, OperandStage -from ..operands import DataFrameOperandMixin, DataFrameShuffleProxy +from ...core.operand import Operand, OperandStage +from ..operands import DataFrameOperandMixin -class GroupByLen(DataFrameOperandMixin, MapReduceOperand): +class GroupByLen(DataFrameOperandMixin, Operand): _op_type_ = opcodes.GROUPBY_LEN def __call__(self, groupby): @@ -16,43 +16,30 @@ def __call__(self, groupby): @classmethod def tile(cls, op: "GroupByLen"): in_groupby = op.inputs[0] - output_type = op.output_types # generate map chunks map_chunks = [] for chunk in in_groupby.chunks: map_op = op.copy().reset_key() map_op.stage = OperandStage.map - map_op._shuffle_size = in_groupby.chunk_shape[0] - map_op._output_types = output_type + map_op.output_types = [OutputType.series] chunk_inputs = [chunk] - map_chunks.append(map_op.new_chunk(chunk_inputs, index=chunk.index)) - - proxy_chunk = DataFrameShuffleProxy(output_types=[output_type]).new_chunk( - map_chunks, shape=() - ) + map_chunks.append(map_op.new_chunk(chunk_inputs)) # generate reduce chunks, we only need one reducer here. - reduce_chunks = [] + out_chunks = [] reduce_op = op.copy().reset_key() - reduce_op._output_types = [OutputType.scalar] + reduce_op.output_types = [OutputType.scalar] reduce_op.stage = OperandStage.reduce - reduce_op.n_reducers = 1 - reduce_chunks.append(reduce_op.new_chunk([proxy_chunk], index=1)) - - # generate the result chunk: - out_chunks = [] - for chunk in reduce_chunks: - groupby_len_op = op.copy().reset_key() - out_chunks.append(groupby_len_op.new_chunk([chunk])) + out_chunks.append(reduce_op.new_chunk(map_chunks)) # final wrap up: new_op = op.copy() params = op.outputs[0].params.copy() params["nsplits"] = ((np.nan,) * len(out_chunks),) params["chunks"] = out_chunks - return new_op.new_tileables([in_groupby], **params) + return new_op.new_scalar(op.inputs, **params) @classmethod def execute_map(cls, ctx, op): @@ -90,8 +77,6 @@ def execute(cls, ctx, op: "GroupByLen"): cls.execute_map(ctx, op) elif op.stage == OperandStage.reduce: cls.execute_reduce(ctx, op) - else: - ctx[op.output[0].key] = ctx[op.inputs[0].key] def groupby_len(groupby): From c356e0f822863f965cf56751f15390eddb4d61b0 Mon Sep 17 00:00:00 2001 From: Rui Ji Date: Wed, 28 Jun 2023 15:34:29 +0800 Subject: [PATCH 15/17] len method implemented with random test passed --- python/xorbits/_mars/dataframe/groupby/len.py | 22 +++++++++------- .../dataframe/groupby/tests/test_groupby.py | 26 ++++++++++++------- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/python/xorbits/_mars/dataframe/groupby/len.py b/python/xorbits/_mars/dataframe/groupby/len.py index 12468037a..b38e77503 100644 --- a/python/xorbits/_mars/dataframe/groupby/len.py +++ b/python/xorbits/_mars/dataframe/groupby/len.py @@ -1,4 +1,3 @@ -import numpy as np import pandas as pd from ... import opcodes @@ -32,17 +31,22 @@ def tile(cls, op: "GroupByLen"): reduce_op = op.copy().reset_key() reduce_op.output_types = [OutputType.scalar] reduce_op.stage = OperandStage.reduce - out_chunks.append(reduce_op.new_chunk(map_chunks)) + params = dict(dtype=int) + out_chunks.append( + reduce_op.new_chunk(map_chunks, shape=(), index=(0,), dtype=int) + ) # final wrap up: new_op = op.copy() params = op.outputs[0].params.copy() - params["nsplits"] = ((np.nan,) * len(out_chunks),) + + params.pop("shape") + params["chunks"] = out_chunks - return new_op.new_scalar(op.inputs, **params) + return new_op.new_scalars(op.inputs, **params) @classmethod - def execute_map(cls, ctx, op): + def execute_map(cls, ctx, op: "GroupByLen"): chunk = op.outputs[0] in_df_grouped = ctx[op.inputs[0].key] @@ -60,13 +64,11 @@ def execute_map(cls, ctx, op): @classmethod def execute_reduce(cls, ctx, op: "GroupByLen"): chunk = op.outputs[0] - input_idx_to_series = dict(op.iter_mapper_data(ctx)) - row_idxes = sorted(input_idx_to_series.keys()) + key = op.inputs[0].key res = set() - for row_index in row_idxes: - row_series = input_idx_to_series.get(row_index, None) - res.update(row_series) + input_series = ctx[key, 1] + res.update(input_series) res_len = len(res) ctx[chunk.key] = res_len diff --git a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py index be53038e2..6f7ab1d7b 100644 --- a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py +++ b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py @@ -528,15 +528,23 @@ def test_groupby_fill(): def test_groupby_len(setup): - df = md.DataFrame( - { - "a": ["a", "b", "a", "c"], - "b": [0.1, 0.2, 0.3, 0.4], - "c": ["aa", "bb", "cc", "aa"], + np.random.seed(42) + num_dataframes = 10 + for i in range(num_dataframes): + # Generate random data + data = { + "Category": np.random.choice(["A", "B", "C"], size=100), + "Value": np.random.randint(1, 100, size=100), } - ) - grouped = df.groupby("b") + # Create DataFrame + df = md.DataFrame(data) + df_test = pd.DataFrame(data) + + grouped = df.groupby("Category") + grouped_test = df_test.groupby("Category") - num_groups = len(grouped) - print(num_groups) + grouped2 = df.groupby("Value") + grouped_test2 = df_test.groupby("Value") + assert len(grouped) == len(grouped_test) + assert len(grouped2) == len(grouped_test2) From 3285a68bf34734bc9f9d87b3468d061fa9727757 Mon Sep 17 00:00:00 2001 From: Rui Ji Date: Thu, 29 Jun 2023 12:40:17 +0800 Subject: [PATCH 16/17] try to solve chunk_size issues --- python/xorbits/_mars/dataframe/groupby/len.py | 6 +- .../dataframe/groupby/tests/test_groupby.py | 31 ++++------ .../groupby/tests/test_groupby_execution.py | 57 ++++++++++++++++--- 3 files changed, 62 insertions(+), 32 deletions(-) diff --git a/python/xorbits/_mars/dataframe/groupby/len.py b/python/xorbits/_mars/dataframe/groupby/len.py index b38e77503..7981378f2 100644 --- a/python/xorbits/_mars/dataframe/groupby/len.py +++ b/python/xorbits/_mars/dataframe/groupby/len.py @@ -31,7 +31,7 @@ def tile(cls, op: "GroupByLen"): reduce_op = op.copy().reset_key() reduce_op.output_types = [OutputType.scalar] reduce_op.stage = OperandStage.reduce - params = dict(dtype=int) + out_chunks.append( reduce_op.new_chunk(map_chunks, shape=(), index=(0,), dtype=int) ) @@ -59,7 +59,7 @@ def execute_map(cls, ctx, op: "GroupByLen"): res.append(index) # use series to convey every index store in this level - ctx[chunk.key, 1] = pd.Series(res) + ctx[chunk.key] = pd.Series(res) @classmethod def execute_reduce(cls, ctx, op: "GroupByLen"): @@ -67,7 +67,7 @@ def execute_reduce(cls, ctx, op: "GroupByLen"): key = op.inputs[0].key res = set() - input_series = ctx[key, 1] + input_series = ctx[key] res.update(input_series) res_len = len(res) diff --git a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py index 6f7ab1d7b..09df9b2e3 100644 --- a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py +++ b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py @@ -527,24 +527,13 @@ def test_groupby_fill(): assert r.chunks[0].shape == (np.nan,) -def test_groupby_len(setup): - np.random.seed(42) - num_dataframes = 10 - for i in range(num_dataframes): - # Generate random data - data = { - "Category": np.random.choice(["A", "B", "C"], size=100), - "Value": np.random.randint(1, 100, size=100), - } - - # Create DataFrame - df = md.DataFrame(data) - df_test = pd.DataFrame(data) - - grouped = df.groupby("Category") - grouped_test = df_test.groupby("Category") - - grouped2 = df.groupby("Value") - grouped_test2 = df_test.groupby("Value") - assert len(grouped) == len(grouped_test) - assert len(grouped2) == len(grouped_test2) +# def test_groupby_len_behavior(setup): +# np.random.seed(42) +# +# num_rows = 10 +# num_columns = 4 +# +# data = np.random.randint(1, 100, size=(num_rows, num_columns)) +# columns = ["Column1", "Column2", "Column3", "Column4"] +# +# df = pd.DataFrame(data, columns=columns) diff --git a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py index 462a3cc7d..9029d8241 100644 --- a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py +++ b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py @@ -1888,14 +1888,55 @@ def test_series_groupby_rolling_agg(setup, window, min_periods, center, closed, pd.testing.assert_series_equal(presult, mresult.sort_index()) -def test_grouby_len(setup): - df = md.DataFrame( - { - "a": ["a", "b", "a", "c"], - "b": [0.1, 0.2, 0.3, 0.4], - "c": ["aa", "bb", "cc", "aa"], +def test_groupby_len(setup): + np.random.seed(42) + num_dataframes = 10 + for i in range(num_dataframes): + # dataframe + data = { + "Category": np.random.choice(["A", "B", "C"], size=100), + "Value": np.random.randint(1, 100, size=100), } - ) - grouped = df.groupby("b") + # DataFrame test + df_test = pd.DataFrame(data) + df = md.DataFrame(df_test) + df_splitted = md.DataFrame(df_test, chunk_size=3) + + grouped = df.groupby("Category") + grouped_test = df_test.groupby( + "Category" + ) # this is the original pandas version. + grouped_splitted = df_splitted.groupby("Category") + + grouped2 = df.groupby("Value") + grouped_test2 = df_test.groupby("Value") + grouped_splitted2 = df_splitted.groupby("Value") + assert len(grouped) == len(grouped_test) + assert len(grouped_splitted) == len(grouped_test) + assert len(grouped2) == len(grouped_test2) + assert len(grouped_splitted2) == len(grouped_test2) + + # Series Groupby test: + data2 = np.random.choice(["A", "B", "C"], size=100) + + series = md.Series(data2) + series_test = pd.Series(data2) + + grouped_s = series.groupby(series) + grouped_s_test = series_test.groupby(series_test) + + assert len(grouped_s) == len(grouped_s_test) + + +def test_temp(setup): + data = { + "Category": np.random.choice(["A", "B", "C"], size=100), + "Value": np.random.randint(1, 100, size=100), + } + + # DataFrame test + df_test = pd.DataFrame(data) + df = md.DataFrame(df_test, chunk_size=3) + grouped = df.groupby("Category") print(len(grouped)) From b2a647cec63afa3907f9bbffb26ebc2d78fda37a Mon Sep 17 00:00:00 2001 From: Rui Ji Date: Thu, 29 Jun 2023 15:57:27 +0800 Subject: [PATCH 17/17] multiple chunks(chunk_size) implemented with UT and IT passed --- python/xorbits/_mars/dataframe/groupby/len.py | 10 ++-- .../dataframe/groupby/tests/test_groupby.py | 46 +++++++++++++++---- .../groupby/tests/test_groupby_execution.py | 30 +++--------- 3 files changed, 47 insertions(+), 39 deletions(-) diff --git a/python/xorbits/_mars/dataframe/groupby/len.py b/python/xorbits/_mars/dataframe/groupby/len.py index 7981378f2..d7733e604 100644 --- a/python/xorbits/_mars/dataframe/groupby/len.py +++ b/python/xorbits/_mars/dataframe/groupby/len.py @@ -64,11 +64,11 @@ def execute_map(cls, ctx, op: "GroupByLen"): @classmethod def execute_reduce(cls, ctx, op: "GroupByLen"): chunk = op.outputs[0] - key = op.inputs[0].key - res = set() - input_series = ctx[key] - res.update(input_series) + for input in op.inputs: + key = input.key + input_series = ctx[key] + res.update(input_series) res_len = len(res) ctx[chunk.key] = res_len @@ -83,4 +83,4 @@ def execute(cls, ctx, op: "GroupByLen"): def groupby_len(groupby): op = GroupByLen() - return op(groupby).execute().fetch() + return op(groupby) diff --git a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py index 09df9b2e3..3cd6e461c 100644 --- a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py +++ b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py @@ -527,13 +527,39 @@ def test_groupby_fill(): assert r.chunks[0].shape == (np.nan,) -# def test_groupby_len_behavior(setup): -# np.random.seed(42) -# -# num_rows = 10 -# num_columns = 4 -# -# data = np.random.randint(1, 100, size=(num_rows, num_columns)) -# columns = ["Column1", "Column2", "Column3", "Column4"] -# -# df = pd.DataFrame(data, columns=columns) +def test_groupby_len_behavior(setup): + df = pd.DataFrame( + [ + [2, 11, 10], + [3, 1, 89], + [6, 1, 51], + [6, 2, 10], + [6, 2, 20], + [3, 2, 35], + [7, 3, 102], + [2, 3, 88], + ], + columns=["one", "two", "three"], + ) + mdf = md.DataFrame(df, chunk_size=3) + + r = tile(mdf.groupby(["two"]).__len__()) + assert r.op.output_types[0] == OutputType.scalar + assert r.shape == () + assert len(r.chunks) == 1 + assert r.chunks[0].shape == () + + r = tile(mdf.groupby(["one", "two"]).__len__()) + assert r.op.output_types[0] == OutputType.scalar + assert r.shape == () + assert len(r.chunks) == 1 + assert r.chunks[0].shape == () + + s1 = pd.Series([4, 3, 9, np.nan, np.nan, 7, 10, 8, 1, 6]) + ms1 = md.Series(s1, chunk_size=3) + + r = tile(ms1.groupby(lambda x: x % 2).__len__()) + assert r.op.output_types[0] == OutputType.scalar + assert r.shape == () + assert len(r.chunks) == 1 + assert r.chunks[0].shape == () diff --git a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py index 9029d8241..0bbc2f05e 100644 --- a/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py +++ b/python/xorbits/_mars/dataframe/groupby/tests/test_groupby_execution.py @@ -1900,43 +1900,25 @@ def test_groupby_len(setup): # DataFrame test df_test = pd.DataFrame(data) - df = md.DataFrame(df_test) - df_splitted = md.DataFrame(df_test, chunk_size=3) + df_splitted = md.DataFrame(df_test, chunk_size=35) - grouped = df.groupby("Category") grouped_test = df_test.groupby( "Category" ) # this is the original pandas version. grouped_splitted = df_splitted.groupby("Category") - - grouped2 = df.groupby("Value") grouped_test2 = df_test.groupby("Value") grouped_splitted2 = df_splitted.groupby("Value") - assert len(grouped) == len(grouped_test) - assert len(grouped_splitted) == len(grouped_test) - assert len(grouped2) == len(grouped_test2) - assert len(grouped_splitted2) == len(grouped_test2) + + assert grouped_splitted.__len__().execute().fetch() == len(grouped_test) + assert grouped_splitted2.__len__().execute().fetch() == len(grouped_test2) # Series Groupby test: data2 = np.random.choice(["A", "B", "C"], size=100) - series = md.Series(data2) + series = md.Series(data2, chunk_size=35) series_test = pd.Series(data2) grouped_s = series.groupby(series) grouped_s_test = series_test.groupby(series_test) - assert len(grouped_s) == len(grouped_s_test) - - -def test_temp(setup): - data = { - "Category": np.random.choice(["A", "B", "C"], size=100), - "Value": np.random.randint(1, 100, size=100), - } - - # DataFrame test - df_test = pd.DataFrame(data) - df = md.DataFrame(df_test, chunk_size=3) - grouped = df.groupby("Category") - print(len(grouped)) + assert grouped_s.__len__().execute().fetch() == len(grouped_s_test)