From 069247a8d093b71d4db5d162d77931218c810116 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Wed, 19 Mar 2025 10:48:02 +0800 Subject: [PATCH 1/2] fix --- python/pyspark/pandas/namespace.py | 31 +++++++++-- .../tests/io/test_dataframe_spark_io.py | 53 +++---------------- 2 files changed, 34 insertions(+), 50 deletions(-) diff --git a/python/pyspark/pandas/namespace.py b/python/pyspark/pandas/namespace.py index d31bc1f48d112..a44c634766abf 100644 --- a/python/pyspark/pandas/namespace.py +++ b/python/pyspark/pandas/namespace.py @@ -65,6 +65,7 @@ StringType, DateType, StructType, + StructField, DataType, ) from pyspark.sql.dataframe import DataFrame as PySparkDataFrame @@ -85,6 +86,7 @@ from pyspark.pandas.frame import DataFrame, _reduce_spark_multi from pyspark.pandas.internal import ( InternalFrame, + InternalField, DEFAULT_SERIES_NAME, HIDDEN_COLUMNS, SPARK_INDEX_NAME_FORMAT, @@ -1186,9 +1188,29 @@ def read_excel_on_spark( pdf = pdf_or_pser psdf = cast(DataFrame, from_pandas(pdf)) - return_schema = force_decimal_precision_scale( - as_nullable_spark_type(psdf._internal.spark_frame.drop(*HIDDEN_COLUMNS).schema) - ) + + index_scol_names = psdf._internal.index_spark_column_names + raw_schema = psdf._internal.spark_frame.drop(*HIDDEN_COLUMNS).schema + nullable_fields = [] + for field in raw_schema.fields: + if field.name in index_scol_names: + print(field) + nullable_fields.append(field) + else: + nullable_fields.append( + StructField( + field.name, + as_nullable_spark_type(field.dataType), + nullable=True, + metadata=field.metadata, + ) + ) + nullable_schema = StructType(nullable_fields) + return_schema = force_decimal_precision_scale(nullable_schema) + + return_data_fields: Optional[List[InternalField]] = None + if psdf._internal.data_fields is not None: + return_data_fields = [f.normalize_spark_type() for f in psdf._internal.data_fields] def output_func(pdf: pd.DataFrame) -> pd.DataFrame: pdf = pd.concat([pd_read_excel(bin, sn=sn) for bin in pdf[pdf.columns[0]]]) @@ -1211,8 +1233,7 @@ def output_func(pdf: pd.DataFrame) -> pd.DataFrame: .select("content") .mapInPandas(lambda iterator: map(output_func, iterator), schema=return_schema) ) - - return DataFrame(psdf._internal.with_new_sdf(sdf)) + return DataFrame(psdf._internal.with_new_sdf(sdf, data_fields=return_data_fields)) if isinstance(pdf_or_psers, dict): return { diff --git a/python/pyspark/pandas/tests/io/test_dataframe_spark_io.py b/python/pyspark/pandas/tests/io/test_dataframe_spark_io.py index b8225b10f1c79..74f152172e3dc 100644 --- a/python/pyspark/pandas/tests/io/test_dataframe_spark_io.py +++ b/python/pyspark/pandas/tests/io/test_dataframe_spark_io.py @@ -253,8 +253,6 @@ def test_spark_io(self): expected_idx.sort_values(by="f").to_spark().toPandas(), ) - # TODO(SPARK-40353): re-enabling the `test_read_excel`. - @unittest.skip("openpyxl") def test_read_excel(self): with self.temp_dir() as tmp: path1 = "{}/file1.xlsx".format(tmp) @@ -266,21 +264,22 @@ def test_read_excel(self): pd.read_excel(open(path1, "rb"), index_col=0), ) self.assert_eq( - ps.read_excel(open(path1, "rb"), index_col=0, squeeze=True), - pd.read_excel(open(path1, "rb"), index_col=0, squeeze=True), + ps.read_excel(open(path1, "rb"), index_col=0), + pd.read_excel(open(path1, "rb"), index_col=0), ) self.assert_eq(ps.read_excel(path1), pd.read_excel(path1)) self.assert_eq(ps.read_excel(path1, index_col=0), pd.read_excel(path1, index_col=0)) self.assert_eq( - ps.read_excel(path1, index_col=0, squeeze=True), - pd.read_excel(path1, index_col=0, squeeze=True), + ps.read_excel(path1, index_col=0), + pd.read_excel(path1, index_col=0), ) self.assert_eq(ps.read_excel(tmp), pd.read_excel(path1)) path2 = "{}/file2.xlsx".format(tmp) self.test_pdf[["i32"]].to_excel(path2) + print(ps.read_excel(tmp, index_col=0).sort_index()) self.assert_eq( ps.read_excel(tmp, index_col=0).sort_index(), pd.concat( @@ -288,11 +287,11 @@ def test_read_excel(self): ).sort_index(), ) self.assert_eq( - ps.read_excel(tmp, index_col=0, squeeze=True).sort_index(), + ps.read_excel(tmp, index_col=0).sort_index(), pd.concat( [ - pd.read_excel(path1, index_col=0, squeeze=True), - pd.read_excel(path2, index_col=0, squeeze=True), + pd.read_excel(path1, index_col=0), + pd.read_excel(path2, index_col=0), ] ).sort_index(), ) @@ -306,21 +305,12 @@ def test_read_excel(self): sheet_names = [["Sheet_name_1", "Sheet_name_2"], None] pdfs1 = pd.read_excel(open(path1, "rb"), sheet_name=None, index_col=0) - pdfs1_squeezed = pd.read_excel( - open(path1, "rb"), sheet_name=None, index_col=0, squeeze=True - ) for sheet_name in sheet_names: psdfs = ps.read_excel(open(path1, "rb"), sheet_name=sheet_name, index_col=0) self.assert_eq(psdfs["Sheet_name_1"], pdfs1["Sheet_name_1"]) self.assert_eq(psdfs["Sheet_name_2"], pdfs1["Sheet_name_2"]) - psdfs = ps.read_excel( - open(path1, "rb"), sheet_name=sheet_name, index_col=0, squeeze=True - ) - self.assert_eq(psdfs["Sheet_name_1"], pdfs1_squeezed["Sheet_name_1"]) - self.assert_eq(psdfs["Sheet_name_2"], pdfs1_squeezed["Sheet_name_2"]) - self.assert_eq( ps.read_excel(tmp, index_col=0, sheet_name="Sheet_name_2"), pdfs1["Sheet_name_2"], @@ -331,30 +321,17 @@ def test_read_excel(self): self.assert_eq(psdfs["Sheet_name_1"], pdfs1["Sheet_name_1"]) self.assert_eq(psdfs["Sheet_name_2"], pdfs1["Sheet_name_2"]) - psdfs = ps.read_excel(tmp, sheet_name=sheet_name, index_col=0, squeeze=True) - self.assert_eq(psdfs["Sheet_name_1"], pdfs1_squeezed["Sheet_name_1"]) - self.assert_eq(psdfs["Sheet_name_2"], pdfs1_squeezed["Sheet_name_2"]) - path2 = "{}/file2.xlsx".format(tmp) with pd.ExcelWriter(path2) as writer: self.test_pdf.to_excel(writer, sheet_name="Sheet_name_1") self.test_pdf[["i32"]].to_excel(writer, sheet_name="Sheet_name_2") pdfs2 = pd.read_excel(path2, sheet_name=None, index_col=0) - pdfs2_squeezed = pd.read_excel(path2, sheet_name=None, index_col=0, squeeze=True) self.assert_eq( ps.read_excel(tmp, sheet_name="Sheet_name_2", index_col=0).sort_index(), pd.concat([pdfs1["Sheet_name_2"], pdfs2["Sheet_name_2"]]).sort_index(), ) - self.assert_eq( - ps.read_excel( - tmp, sheet_name="Sheet_name_2", index_col=0, squeeze=True - ).sort_index(), - pd.concat( - [pdfs1_squeezed["Sheet_name_2"], pdfs2_squeezed["Sheet_name_2"]] - ).sort_index(), - ) for sheet_name in sheet_names: psdfs = ps.read_excel(tmp, sheet_name=sheet_name, index_col=0) @@ -367,20 +344,6 @@ def test_read_excel(self): pd.concat([pdfs1["Sheet_name_2"], pdfs2["Sheet_name_2"]]).sort_index(), ) - psdfs = ps.read_excel(tmp, sheet_name=sheet_name, index_col=0, squeeze=True) - self.assert_eq( - psdfs["Sheet_name_1"].sort_index(), - pd.concat( - [pdfs1_squeezed["Sheet_name_1"], pdfs2_squeezed["Sheet_name_1"]] - ).sort_index(), - ) - self.assert_eq( - psdfs["Sheet_name_2"].sort_index(), - pd.concat( - [pdfs1_squeezed["Sheet_name_2"], pdfs2_squeezed["Sheet_name_2"]] - ).sort_index(), - ) - def test_read_orc(self): with self.temp_dir() as tmp: path = "{}/file1.orc".format(tmp) From c9d7ff450498e80840d1d9dd59a48dcaf2a7bd56 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Wed, 19 Mar 2025 16:41:22 +0800 Subject: [PATCH 2/2] fix --- python/pyspark/pandas/namespace.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pyspark/pandas/namespace.py b/python/pyspark/pandas/namespace.py index a44c634766abf..a5c5a32037b8b 100644 --- a/python/pyspark/pandas/namespace.py +++ b/python/pyspark/pandas/namespace.py @@ -1189,12 +1189,11 @@ def read_excel_on_spark( psdf = cast(DataFrame, from_pandas(pdf)) - index_scol_names = psdf._internal.index_spark_column_names raw_schema = psdf._internal.spark_frame.drop(*HIDDEN_COLUMNS).schema + index_scol_names = psdf._internal.index_spark_column_names nullable_fields = [] for field in raw_schema.fields: if field.name in index_scol_names: - print(field) nullable_fields.append(field) else: nullable_fields.append(