diff --git a/python/docs/source/reference/pyspark.pandas/window.rst b/python/docs/source/reference/pyspark.pandas/window.rst index c840be357fa75..f6f3a484d3e98 100644 --- a/python/docs/source/reference/pyspark.pandas/window.rst +++ b/python/docs/source/reference/pyspark.pandas/window.rst @@ -39,6 +39,7 @@ Standard moving window functions Rolling.max Rolling.mean Rolling.quantile + Rolling.median Standard expanding window functions ----------------------------------- @@ -52,6 +53,7 @@ Standard expanding window functions Expanding.max Expanding.mean Expanding.quantile + Expanding.median Exponential moving window functions ----------------------------------- diff --git a/python/pyspark/pandas/missing/window.py b/python/pyspark/pandas/missing/window.py index a6d423d08f1c7..5342ed5170f1c 100644 --- a/python/pyspark/pandas/missing/window.py +++ b/python/pyspark/pandas/missing/window.py @@ -81,7 +81,6 @@ class MissingPandasLikeExpanding: apply = _unsupported_function_expanding("apply") corr = _unsupported_function_expanding("corr") cov = _unsupported_function_expanding("cov") - median = _unsupported_function_expanding("median") validate = _unsupported_function_expanding("validate") exclusions = _unsupported_property_expanding("exclusions") @@ -99,7 +98,6 @@ class MissingPandasLikeRolling: apply = _unsupported_function_rolling("apply") corr = _unsupported_function_rolling("corr") cov = _unsupported_function_rolling("cov") - median = _unsupported_function_rolling("median") validate = _unsupported_function_rolling("validate") exclusions = _unsupported_property_rolling("exclusions") @@ -117,7 +115,6 @@ class MissingPandasLikeExpandingGroupby: apply = _unsupported_function_expanding("apply") corr = _unsupported_function_expanding("corr") cov = _unsupported_function_expanding("cov") - median = _unsupported_function_expanding("median") validate = _unsupported_function_expanding("validate") exclusions = _unsupported_property_expanding("exclusions") @@ -135,7 +132,6 @@ class MissingPandasLikeRollingGroupby: apply = _unsupported_function_rolling("apply") corr = _unsupported_function_rolling("corr") cov = _unsupported_function_rolling("cov") - median = _unsupported_function_rolling("median") validate = _unsupported_function_rolling("validate") exclusions = _unsupported_property_rolling("exclusions") diff --git a/python/pyspark/pandas/tests/window/test_expanding_adv.py b/python/pyspark/pandas/tests/window/test_expanding_adv.py index 554c11e46b22d..2a639773b0885 100644 --- a/python/pyspark/pandas/tests/window/test_expanding_adv.py +++ b/python/pyspark/pandas/tests/window/test_expanding_adv.py @@ -23,6 +23,9 @@ class ExpandingAdvMixin(ExpandingTestingFuncMixin): + def test_expanding_median(self): + self._test_expanding_func("median", lambda x: x.quantile(0.5, "lower")) + def test_expanding_quantile(self): self._test_expanding_func(lambda x: x.quantile(0.5), lambda x: x.quantile(0.5, "lower")) diff --git a/python/pyspark/pandas/tests/window/test_groupby_expanding_adv.py b/python/pyspark/pandas/tests/window/test_groupby_expanding_adv.py index 4de1bce2bae8d..a59a97eb2667b 100644 --- a/python/pyspark/pandas/tests/window/test_groupby_expanding_adv.py +++ b/python/pyspark/pandas/tests/window/test_groupby_expanding_adv.py @@ -20,6 +20,9 @@ class GroupByExpandingAdvMixin(GroupByExpandingTestingFuncMixin): + def test_groupby_expanding_median(self): + self._test_groupby_expanding_func("median", lambda x: x.quantile(0.5, "lower")) + def test_groupby_expanding_quantile(self): self._test_groupby_expanding_func( lambda x: x.quantile(0.5), lambda x: x.quantile(0.5, "lower") diff --git a/python/pyspark/pandas/tests/window/test_groupby_rolling_adv.py b/python/pyspark/pandas/tests/window/test_groupby_rolling_adv.py index 567cdfa674653..7515c7be9cadb 100644 --- a/python/pyspark/pandas/tests/window/test_groupby_rolling_adv.py +++ b/python/pyspark/pandas/tests/window/test_groupby_rolling_adv.py @@ -20,6 +20,9 @@ class GroupByRollingAdvMixin(GroupByRollingTestingFuncMixin): + def test_groupby_rolling_median(self): + self._test_groupby_rolling_func("median", lambda x: x.quantile(0.5, "lower")) + def test_groupby_rolling_quantile(self): self._test_groupby_rolling_func( lambda x: x.quantile(0.5), lambda x: x.quantile(0.5, "lower") diff --git a/python/pyspark/pandas/tests/window/test_rolling_adv.py b/python/pyspark/pandas/tests/window/test_rolling_adv.py index 47bdab8705b6b..983e164771fe0 100644 --- a/python/pyspark/pandas/tests/window/test_rolling_adv.py +++ b/python/pyspark/pandas/tests/window/test_rolling_adv.py @@ -20,6 +20,9 @@ class RollingAdvMixin(RollingTestingFuncMixin): + def test_rolling_median(self): + self._test_rolling_func("median", lambda x: x.quantile(0.5, "lower")) + def test_rolling_quantile(self): self._test_rolling_func(lambda x: x.quantile(0.5), lambda x: x.quantile(0.5, "lower")) diff --git a/python/pyspark/pandas/window.py b/python/pyspark/pandas/window.py index 294923d5f50e7..89733a0ca06c7 100644 --- a/python/pyspark/pandas/window.py +++ b/python/pyspark/pandas/window.py @@ -110,6 +110,9 @@ def quantile(scol: Column) -> Column: return self._apply_as_series_or_frame(quantile) + def median(self) -> FrameLike: + return self.quantile(0.5) + def std(self) -> FrameLike: def std(scol: Column) -> Column: return F.when( @@ -674,6 +677,64 @@ def quantile(self, quantile: float, accuracy: int = 10000) -> FrameLike: """ return super().quantile(quantile, accuracy) + def median(self) -> FrameLike: + """ + Calculate the rolling median. + + .. versionadded:: 4.3.0 + + .. note:: the current implementation of this API uses Spark's Window without + specifying partition specification. This leads to move all data into + single partition in single machine and could cause serious + performance degradation. Avoid this method against very large dataset. + + Returns + ------- + Series or DataFrame + Returned object type is determined by the caller of the rolling + calculation. + + See Also + -------- + pyspark.pandas.Series.rolling : Calling object with Series data. + pyspark.pandas.DataFrame.rolling : Calling object with DataFrames. + pyspark.pandas.Series.median : Equivalent method for Series. + pyspark.pandas.DataFrame.median : Equivalent method for DataFrame. + + Notes + ----- + `median` in pandas-on-Spark uses distributed percentile approximation + algorithm unlike pandas, the result might be different with pandas. + + Examples + -------- + >>> s = ps.Series([4, 3, 5, 2, 6]) + >>> s + 0 4 + 1 3 + 2 5 + 3 2 + 4 6 + dtype: int64 + + >>> s.rolling(2).median() + 0 NaN + 1 3.0 + 2 3.0 + 3 2.0 + 4 2.0 + dtype: float64 + + >>> s.rolling(3).median() + 0 NaN + 1 NaN + 2 4.0 + 3 3.0 + 4 5.0 + dtype: float64 + """ + return super().median() + def std(self) -> FrameLike: """ Calculate rolling standard deviation. @@ -1320,6 +1381,67 @@ def quantile(self, quantile: float, accuracy: int = 10000) -> FrameLike: """ return super().quantile(quantile, accuracy) + def median(self) -> FrameLike: + """ + Calculate the rolling median. + + .. versionadded:: 4.3.0 + + Returns + ------- + Series or DataFrame + Returned object type is determined by the caller of the rolling + calculation. + + See Also + -------- + pyspark.pandas.Series.rolling : Calling object with Series data. + pyspark.pandas.DataFrame.rolling : Calling object with DataFrames. + pyspark.pandas.Series.median : Equivalent method for Series. + pyspark.pandas.DataFrame.median : Equivalent method for DataFrame. + + Notes + ----- + `median` in pandas-on-Spark uses distributed percentile approximation + algorithm unlike pandas, the result might be different with pandas. + + Examples + -------- + >>> s = ps.Series([2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5]) + >>> s.groupby(s).rolling(3).median().sort_index() + 2 0 NaN + 1 NaN + 3 2 NaN + 3 NaN + 4 3.0 + 4 5 NaN + 6 NaN + 7 4.0 + 8 4.0 + 5 9 NaN + 10 NaN + dtype: float64 + + For DataFrame, each rolling median is computed column-wise. + + >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) + >>> df.groupby(df.A).rolling(2).median().sort_index() + B + A + 2 0 NaN + 1 4.0 + 3 2 NaN + 3 9.0 + 4 9.0 + 4 5 NaN + 6 16.0 + 7 16.0 + 8 16.0 + 5 9 NaN + 10 25.0 + """ + return super().median() + def std(self) -> FrameLike: """ Calculate rolling standard deviation. @@ -1727,6 +1849,64 @@ def quantile(self, quantile: float, accuracy: int = 10000) -> FrameLike: """ return super().quantile(quantile, accuracy) + def median(self) -> FrameLike: + """ + Calculate the expanding median. + + .. versionadded:: 4.3.0 + + .. note:: the current implementation of this API uses Spark's Window without + specifying partition specification. This leads to move all data into + single partition in single machine and could cause serious + performance degradation. Avoid this method against very large dataset. + + Returns + ------- + Series or DataFrame + Returned object type is determined by the caller of the expanding + calculation. + + See Also + -------- + pyspark.pandas.Series.expanding : Calling object with Series data. + pyspark.pandas.DataFrame.expanding : Calling object with DataFrames. + pyspark.pandas.Series.median : Equivalent method for Series. + pyspark.pandas.DataFrame.median : Equivalent method for DataFrame. + + Notes + ----- + `median` in pandas-on-Spark uses distributed percentile approximation + algorithm unlike pandas, the result might be different with pandas. + + Examples + -------- + >>> s = ps.Series([4, 3, 5, 2, 6]) + >>> s + 0 4 + 1 3 + 2 5 + 3 2 + 4 6 + dtype: int64 + + >>> s.expanding(2).median() + 0 NaN + 1 3.0 + 2 4.0 + 3 3.0 + 4 4.0 + dtype: float64 + + >>> s.expanding(3).median() + 0 NaN + 1 NaN + 2 4.0 + 3 3.0 + 4 4.0 + dtype: float64 + """ + return super().median() + def std(self) -> FrameLike: """ Calculate expanding standard deviation. @@ -2293,6 +2473,49 @@ def quantile(self, quantile: float, accuracy: int = 10000) -> FrameLike: """ return super().quantile(quantile, accuracy) + def median(self) -> FrameLike: + """ + Calculate the expanding median. + + .. versionadded:: 4.3.0 + + Returns + ------- + Series or DataFrame + Returned object type is determined by the caller of the expanding + calculation. + + See Also + -------- + pyspark.pandas.Series.expanding : Calling expanding with Series data. + pyspark.pandas.DataFrame.expanding : Calling expanding with DataFrames. + pyspark.pandas.Series.median : Equivalent method for Series. + pyspark.pandas.DataFrame.median : Equivalent method for DataFrame. + + Notes + ----- + `median` in pandas-on-Spark uses distributed percentile approximation + algorithm unlike pandas, the result might be different with pandas. + + Examples + -------- + >>> s = ps.Series([2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5]) + >>> s.groupby(s).expanding(3).median().sort_index() + 2 0 NaN + 1 NaN + 3 2 NaN + 3 NaN + 4 3.0 + 4 5 NaN + 6 NaN + 7 4.0 + 8 4.0 + 5 9 NaN + 10 NaN + dtype: float64 + """ + return super().median() + def std(self) -> FrameLike: """ Calculate expanding standard deviation.