Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/docs/source/reference/pyspark.pandas/window.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Standard moving window functions
Rolling.max
Rolling.mean
Rolling.quantile
Rolling.median

Standard expanding window functions
-----------------------------------
Expand All @@ -52,6 +53,7 @@ Standard expanding window functions
Expanding.max
Expanding.mean
Expanding.quantile
Expanding.median

Exponential moving window functions
-----------------------------------
Expand Down
4 changes: 0 additions & 4 deletions python/pyspark/pandas/missing/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ class MissingPandasLikeExpanding:
apply = _unsupported_function_expanding("apply")
corr = _unsupported_function_expanding("corr")
cov = _unsupported_function_expanding("cov")
median = _unsupported_function_expanding("median")
validate = _unsupported_function_expanding("validate")

exclusions = _unsupported_property_expanding("exclusions")
Expand All @@ -99,7 +98,6 @@ class MissingPandasLikeRolling:
apply = _unsupported_function_rolling("apply")
corr = _unsupported_function_rolling("corr")
cov = _unsupported_function_rolling("cov")
median = _unsupported_function_rolling("median")
validate = _unsupported_function_rolling("validate")

exclusions = _unsupported_property_rolling("exclusions")
Expand All @@ -117,7 +115,6 @@ class MissingPandasLikeExpandingGroupby:
apply = _unsupported_function_expanding("apply")
corr = _unsupported_function_expanding("corr")
cov = _unsupported_function_expanding("cov")
median = _unsupported_function_expanding("median")
validate = _unsupported_function_expanding("validate")

exclusions = _unsupported_property_expanding("exclusions")
Expand All @@ -135,7 +132,6 @@ class MissingPandasLikeRollingGroupby:
apply = _unsupported_function_rolling("apply")
corr = _unsupported_function_rolling("corr")
cov = _unsupported_function_rolling("cov")
median = _unsupported_function_rolling("median")
validate = _unsupported_function_rolling("validate")

exclusions = _unsupported_property_rolling("exclusions")
Expand Down
3 changes: 3 additions & 0 deletions python/pyspark/pandas/tests/window/test_expanding_adv.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@


class ExpandingAdvMixin(ExpandingTestingFuncMixin):
def test_expanding_median(self):
self._test_expanding_func("median", lambda x: x.quantile(0.5, "lower"))

def test_expanding_quantile(self):
self._test_expanding_func(lambda x: x.quantile(0.5), lambda x: x.quantile(0.5, "lower"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@


class GroupByExpandingAdvMixin(GroupByExpandingTestingFuncMixin):
def test_groupby_expanding_median(self):
self._test_groupby_expanding_func("median", lambda x: x.quantile(0.5, "lower"))

def test_groupby_expanding_quantile(self):
self._test_groupby_expanding_func(
lambda x: x.quantile(0.5), lambda x: x.quantile(0.5, "lower")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@


class GroupByRollingAdvMixin(GroupByRollingTestingFuncMixin):
def test_groupby_rolling_median(self):
self._test_groupby_rolling_func("median", lambda x: x.quantile(0.5, "lower"))

def test_groupby_rolling_quantile(self):
self._test_groupby_rolling_func(
lambda x: x.quantile(0.5), lambda x: x.quantile(0.5, "lower")
Expand Down
3 changes: 3 additions & 0 deletions python/pyspark/pandas/tests/window/test_rolling_adv.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@


class RollingAdvMixin(RollingTestingFuncMixin):
def test_rolling_median(self):
self._test_rolling_func("median", lambda x: x.quantile(0.5, "lower"))

def test_rolling_quantile(self):
self._test_rolling_func(lambda x: x.quantile(0.5), lambda x: x.quantile(0.5, "lower"))

Expand Down
223 changes: 223 additions & 0 deletions python/pyspark/pandas/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ def quantile(scol: Column) -> Column:

return self._apply_as_series_or_frame(quantile)

def median(self) -> FrameLike:
return self.quantile(0.5)

def std(self) -> FrameLike:
def std(scol: Column) -> Column:
return F.when(
Expand Down Expand Up @@ -674,6 +677,64 @@ def quantile(self, quantile: float, accuracy: int = 10000) -> FrameLike:
"""
return super().quantile(quantile, accuracy)

def median(self) -> FrameLike:
"""
Calculate the rolling median.

.. versionadded:: 4.3.0

.. note:: the current implementation of this API uses Spark's Window without
specifying partition specification. This leads to move all data into
single partition in single machine and could cause serious
performance degradation. Avoid this method against very large dataset.

Returns
-------
Series or DataFrame
Returned object type is determined by the caller of the rolling
calculation.

See Also
--------
pyspark.pandas.Series.rolling : Calling object with Series data.
pyspark.pandas.DataFrame.rolling : Calling object with DataFrames.
pyspark.pandas.Series.median : Equivalent method for Series.
pyspark.pandas.DataFrame.median : Equivalent method for DataFrame.

Notes
-----
`median` in pandas-on-Spark uses distributed percentile approximation
algorithm unlike pandas, the result might be different with pandas.

Examples
--------
>>> s = ps.Series([4, 3, 5, 2, 6])
>>> s
0 4
1 3
2 5
3 2
4 6
dtype: int64

>>> s.rolling(2).median()
0 NaN
1 3.0
2 3.0
3 2.0
4 2.0
dtype: float64

>>> s.rolling(3).median()
0 NaN
1 NaN
2 4.0
3 3.0
4 5.0
dtype: float64
"""
return super().median()

def std(self) -> FrameLike:
"""
Calculate rolling standard deviation.
Expand Down Expand Up @@ -1320,6 +1381,67 @@ def quantile(self, quantile: float, accuracy: int = 10000) -> FrameLike:
"""
return super().quantile(quantile, accuracy)

def median(self) -> FrameLike:
"""
Calculate the rolling median.

.. versionadded:: 4.3.0

Returns
-------
Series or DataFrame
Returned object type is determined by the caller of the rolling
calculation.

See Also
--------
pyspark.pandas.Series.rolling : Calling object with Series data.
pyspark.pandas.DataFrame.rolling : Calling object with DataFrames.
pyspark.pandas.Series.median : Equivalent method for Series.
pyspark.pandas.DataFrame.median : Equivalent method for DataFrame.

Notes
-----
`median` in pandas-on-Spark uses distributed percentile approximation
algorithm unlike pandas, the result might be different with pandas.

Examples
--------
>>> s = ps.Series([2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5])
>>> s.groupby(s).rolling(3).median().sort_index()
2 0 NaN
1 NaN
3 2 NaN
3 NaN
4 3.0
4 5 NaN
6 NaN
7 4.0
8 4.0
5 9 NaN
10 NaN
dtype: float64

For DataFrame, each rolling median is computed column-wise.

>>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2})
>>> df.groupby(df.A).rolling(2).median().sort_index()
B
A
2 0 NaN
1 4.0
3 2 NaN
3 9.0
4 9.0
4 5 NaN
6 16.0
7 16.0
8 16.0
5 9 NaN
10 25.0
"""
return super().median()

def std(self) -> FrameLike:
"""
Calculate rolling standard deviation.
Expand Down Expand Up @@ -1727,6 +1849,64 @@ def quantile(self, quantile: float, accuracy: int = 10000) -> FrameLike:
"""
return super().quantile(quantile, accuracy)

def median(self) -> FrameLike:
"""
Calculate the expanding median.

.. versionadded:: 4.3.0

.. note:: the current implementation of this API uses Spark's Window without
specifying partition specification. This leads to move all data into
single partition in single machine and could cause serious
performance degradation. Avoid this method against very large dataset.

Returns
-------
Series or DataFrame
Returned object type is determined by the caller of the expanding
calculation.

See Also
--------
pyspark.pandas.Series.expanding : Calling object with Series data.
pyspark.pandas.DataFrame.expanding : Calling object with DataFrames.
pyspark.pandas.Series.median : Equivalent method for Series.
pyspark.pandas.DataFrame.median : Equivalent method for DataFrame.

Notes
-----
`median` in pandas-on-Spark uses distributed percentile approximation
algorithm unlike pandas, the result might be different with pandas.

Examples
--------
>>> s = ps.Series([4, 3, 5, 2, 6])
>>> s
0 4
1 3
2 5
3 2
4 6
dtype: int64

>>> s.expanding(2).median()
0 NaN
1 3.0
2 4.0
3 3.0
4 4.0
dtype: float64

>>> s.expanding(3).median()
0 NaN
1 NaN
2 4.0
3 3.0
4 4.0
dtype: float64
"""
return super().median()

def std(self) -> FrameLike:
"""
Calculate expanding standard deviation.
Expand Down Expand Up @@ -2293,6 +2473,49 @@ def quantile(self, quantile: float, accuracy: int = 10000) -> FrameLike:
"""
return super().quantile(quantile, accuracy)

def median(self) -> FrameLike:
"""
Calculate the expanding median.

.. versionadded:: 4.3.0

Returns
-------
Series or DataFrame
Returned object type is determined by the caller of the expanding
calculation.

See Also
--------
pyspark.pandas.Series.expanding : Calling expanding with Series data.
pyspark.pandas.DataFrame.expanding : Calling expanding with DataFrames.
pyspark.pandas.Series.median : Equivalent method for Series.
pyspark.pandas.DataFrame.median : Equivalent method for DataFrame.

Notes
-----
`median` in pandas-on-Spark uses distributed percentile approximation
algorithm unlike pandas, the result might be different with pandas.

Examples
--------
>>> s = ps.Series([2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5])
>>> s.groupby(s).expanding(3).median().sort_index()
2 0 NaN
1 NaN
3 2 NaN
3 NaN
4 3.0
4 5 NaN
6 NaN
7 4.0
8 4.0
5 9 NaN
10 NaN
dtype: float64
"""
return super().median()

def std(self) -> FrameLike:
"""
Calculate expanding standard deviation.
Expand Down