From 1875fb5d231ffaf308c248873dcea9155322104a Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Tue, 27 May 2025 19:09:30 +0200 Subject: [PATCH 01/10] Update docs to include to_iceberg --- doc/source/getting_started/install.rst | 2 +- doc/source/reference/io.rst | 1 + doc/source/user_guide/io.rst | 2 +- doc/source/whatsnew/v3.0.0.rst | 2 +- 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/doc/source/getting_started/install.rst b/doc/source/getting_started/install.rst index 93663c1cced7e..1589fea5f8953 100644 --- a/doc/source/getting_started/install.rst +++ b/doc/source/getting_started/install.rst @@ -308,7 +308,7 @@ Dependency Minimum Version pip ex `zlib `__ hdf5 Compression for HDF5 `fastparquet `__ 2024.2.0 - Parquet reading / writing (pyarrow is default) `pyarrow `__ 10.0.1 parquet, feather Parquet, ORC, and feather reading / writing -`PyIceberg `__ 0.7.1 iceberg Apache Iceberg reading +`PyIceberg `__ 0.7.1 iceberg Apache Iceberg reading / writing `pyreadstat `__ 1.2.6 spss SPSS files (.sav) reading `odfpy `__ 1.4.1 excel Open document format (.odf, .ods, .odt) reading / writing ====================================================== ================== ================ ========================================================== diff --git a/doc/source/reference/io.rst b/doc/source/reference/io.rst index 6e5992916f800..37d9e7f6b7dbd 100644 --- a/doc/source/reference/io.rst +++ b/doc/source/reference/io.rst @@ -162,6 +162,7 @@ Iceberg :toctree: api/ read_iceberg + DataFrame.to_iceberg .. warning:: ``read_iceberg`` is experimental and may change without warning. diff --git a/doc/source/user_guide/io.rst b/doc/source/user_guide/io.rst index 2a7cab701eecf..3e09027f7ada8 100644 --- a/doc/source/user_guide/io.rst +++ b/doc/source/user_guide/io.rst @@ -29,7 +29,7 @@ The pandas I/O API is a set of top level ``reader`` functions accessed like binary,`HDF5 Format `__, :ref:`read_hdf`, :ref:`to_hdf` binary,`Feather Format `__, :ref:`read_feather`, :ref:`to_feather` binary,`Parquet Format `__, :ref:`read_parquet`, :ref:`to_parquet` - binary,`Apache Iceberg `__, :ref:`read_iceberg` , NA + binary,`Apache Iceberg `__, :ref:`read_iceberg` , :ref:`to_iceberg` binary,`ORC Format `__, :ref:`read_orc`, :ref:`to_orc` binary,`Stata `__, :ref:`read_stata`, :ref:`to_stata` binary,`SAS `__, :ref:`read_sas` , NA diff --git a/doc/source/whatsnew/v3.0.0.rst b/doc/source/whatsnew/v3.0.0.rst index 099e5bc48353a..e93c7d15986d4 100644 --- a/doc/source/whatsnew/v3.0.0.rst +++ b/doc/source/whatsnew/v3.0.0.rst @@ -78,7 +78,7 @@ Other enhancements - :py:class:`frozenset` elements in pandas objects are now natively printed (:issue:`60690`) - Add ``"delete_rows"`` option to ``if_exists`` argument in :meth:`DataFrame.to_sql` deleting all records of the table before inserting data (:issue:`37210`). - Added half-year offset classes :class:`HalfYearBegin`, :class:`HalfYearEnd`, :class:`BHalfYearBegin` and :class:`BHalfYearEnd` (:issue:`60928`) -- Added support to read from Apache Iceberg tables with the new :func:`read_iceberg` function (:issue:`61383`) +- Added support to read and write from and to Apache Iceberg tables with the new :func:`read_iceberg` :meth:`DataFrame.to_iceberg` functions (:issue:`61383`) - Errors occurring during SQL I/O will now throw a generic :class:`.DatabaseError` instead of the raw Exception type from the underlying driver manager library (:issue:`60748`) - Implemented :meth:`Series.str.isascii` and :meth:`Series.str.isascii` (:issue:`59091`) - Improved deprecation message for offset aliases (:issue:`60820`) From b31ae80710a5b63621f77479873488c55ee55908 Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Tue, 27 May 2025 23:32:12 +0200 Subject: [PATCH 02/10] Implementing to_iceberg --- pandas/core/frame.py | 48 +++++++++++++++++++++++ pandas/io/iceberg.py | 67 ++++++++++++++++++++++++++++++--- pandas/tests/io/test_iceberg.py | 40 +++++++++++++++++++- 3 files changed, 148 insertions(+), 7 deletions(-) diff --git a/pandas/core/frame.py b/pandas/core/frame.py index b2c1e38f61f4c..68ce3baa959b3 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -3547,6 +3547,54 @@ def to_xml( return xml_formatter.write_output() + def to_iceberg( + self, + table_identifier: str, + catalog_name: str | None = None, + *, + catalog_properties: dict[str, Any] | None = None, + location: str | None = None, + snapshot_properties: dict[str, str] | None = None, + ): + """ + Write a DataFrame to an Apache Iceberg table. + + .. versionadded:: 3.0.0 + + Parameters + ---------- + table_identifier : str + Table identifier. + catalog_name : str, optional + The name of the catalog. + catalog_properties : dict of {str: str}, optional + The properties that are used next to the catalog configuration. + location : str, optional + Location for the table. + snapshot_properties : dict of {str: str}, optional + Custom properties to be added to the snapshot summary + + See Also + -------- + read_iceberg : Read an Apache Iceberg table. + DataFrame.to_parquet : Write a DataFrame in Parquet format. + + Examples + -------- + >>> df = pd.DataFrame(data={"col1": [1, 2], "col2": [4, 3]}) + >>> df.to_iceberg("my_table", catalog_name="my_catalog") + """ + from pandas.io.iceberg import to_iceberg + + return to_iceberg( + self, + table_identifier, + catalog_name, + catalog_properties=catalog_properties, + location=location, + snapshot_properties=snapshot_properties, + ) + # ---------------------------------------------------------------------- @doc(INFO_DOCSTRING, **frame_sub_kwargs) def info( diff --git a/pandas/io/iceberg.py b/pandas/io/iceberg.py index 8a3e8f5da49b3..f0c2e9516dc30 100644 --- a/pandas/io/iceberg.py +++ b/pandas/io/iceberg.py @@ -7,9 +7,17 @@ from pandas import DataFrame +def _get_catalog(catalog_name: str | None, catalog_properties: dict[str, Any] | None): + pyiceberg_catalog = import_optional_dependency("pyiceberg.catalog") + if catalog_properties is None: + catalog_properties = {} + return pyiceberg_catalog.load_catalog(catalog_name, **catalog_properties) + + def read_iceberg( table_identifier: str, catalog_name: str | None = None, + *, catalog_properties: dict[str, Any] | None = None, row_filter: str | None = None, selected_fields: tuple[str] | None = None, @@ -69,12 +77,8 @@ def read_iceberg( ... selected_fields=("VendorID", "tpep_pickup_datetime"), ... ) # doctest: +SKIP """ - pyiceberg_catalog = import_optional_dependency("pyiceberg.catalog") + catalog = _get_catalog(catalog_name, catalog_properties) pyiceberg_expressions = import_optional_dependency("pyiceberg.expressions") - - if catalog_properties is None: - catalog_properties = {} - catalog = pyiceberg_catalog.load_catalog(catalog_name, **catalog_properties) table = catalog.load_table(table_identifier) if row_filter is None: row_filter = pyiceberg_expressions.AlwaysTrue() @@ -91,3 +95,56 @@ def read_iceberg( limit=limit, ) return result.to_pandas() + + +def to_iceberg( + df: DataFrame, + table_identifier: str, + catalog_name: str | None = None, + *, + catalog_properties: dict[str, Any] | None = None, + location: str | None = None, + snapshot_properties: dict[str, str] | None = None, +): + """ + Write a DataFrame to an Apache Iceberg table. + + .. versionadded:: 3.0.0 + + Parameters + ---------- + table_identifier : str + Table identifier. + catalog_name : str, optional + The name of the catalog. + catalog_properties : dict of {str: str}, optional + The properties that are used next to the catalog configuration. + location : str, optional + Location for the table. + snapshot_properties : dict of {str: str}, optional + Custom properties to be added to the snapshot summary + + See Also + -------- + read_iceberg : Read an Apache Iceberg table. + DataFrame.to_parquet : Write a DataFrame in Parquet format. + + Examples + -------- + >>> df = pd.DataFrame(data={"col1": [1, 2], "col2": [4, 3]}) + >>> df.to_iceberg("my_table", catalog_name="my_catalog") + """ + pa = import_optional_dependency("pyarrow") + + catalog = _get_catalog(catalog_name, catalog_properties) + arrow_table = pa.Table.from_pandas(df) + table = catalog.create_table_if_not_exists( + identifier=table_identifier, + schema=arrow_table.schema, + location=location, + # we could add `partition_spec`, `sort_order` and `properties` in the + # future, but it may not be trivial without exposing PyIceberg objects + ) + if snapshot_properties is None: + snapshot_properties = {} + table.append(arrow_table, snapshot_properties=snapshot_properties) diff --git a/pandas/tests/io/test_iceberg.py b/pandas/tests/io/test_iceberg.py index 765eccb602434..c0597c6f79009 100644 --- a/pandas/tests/io/test_iceberg.py +++ b/pandas/tests/io/test_iceberg.py @@ -22,7 +22,7 @@ pyiceberg_catalog = pytest.importorskip("pyiceberg.catalog") pq = pytest.importorskip("pyarrow.parquet") -Catalog = collections.namedtuple("Catalog", ["name", "uri"]) +Catalog = collections.namedtuple("Catalog", ["name", "uri", "warehouse"]) @pytest.fixture @@ -58,7 +58,7 @@ def catalog(request, tmp_path): importlib.reload(pyiceberg_catalog) # needed to reload the config file - yield Catalog(name=catalog_name or "default", uri=uri) + yield Catalog(name=catalog_name or "default", uri=uri, warehouse=warehouse) if catalog_name is not None: config_path.unlink() @@ -141,3 +141,39 @@ def test_read_with_limit(self, catalog): limit=2, ) tm.assert_frame_equal(result, expected) + + def test_write(self, catalog): + df = pd.DataFrame( + { + "A": [1, 2, 3], + "B": ["foo", "foo", "foo"], + } + ) + df.to_iceberg( + "ns.new_table", + catalog_properties={"uri": catalog.uri}, + location=catalog.warehouse, + ) + result = read_iceberg( + "ns.new_table", + catalog_properties={"uri": catalog.uri}, + ) + tm.assert_frame_equal(result, df) + + @pytest.mark.parametrize("catalog", ["default", "pandas_tests"], indirect=True) + def test_write_by_catalog_name(self, catalog): + df = pd.DataFrame( + { + "A": [1, 2, 3], + "B": ["foo", "foo", "foo"], + } + ) + df.to_iceberg( + "ns.new_table", + catalog_name=catalog.name, + ) + result = read_iceberg( + "ns.new_table", + catalog_name=catalog.name, + ) + tm.assert_frame_equal(result, df) From 373138c4830a97c9fd4c7f8dfe32e02fe402b207 Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Tue, 27 May 2025 23:35:39 +0200 Subject: [PATCH 03/10] Typo in whatsnew --- doc/source/whatsnew/v3.0.0.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/whatsnew/v3.0.0.rst b/doc/source/whatsnew/v3.0.0.rst index e93c7d15986d4..aae0ee1652feb 100644 --- a/doc/source/whatsnew/v3.0.0.rst +++ b/doc/source/whatsnew/v3.0.0.rst @@ -78,7 +78,7 @@ Other enhancements - :py:class:`frozenset` elements in pandas objects are now natively printed (:issue:`60690`) - Add ``"delete_rows"`` option to ``if_exists`` argument in :meth:`DataFrame.to_sql` deleting all records of the table before inserting data (:issue:`37210`). - Added half-year offset classes :class:`HalfYearBegin`, :class:`HalfYearEnd`, :class:`BHalfYearBegin` and :class:`BHalfYearEnd` (:issue:`60928`) -- Added support to read and write from and to Apache Iceberg tables with the new :func:`read_iceberg` :meth:`DataFrame.to_iceberg` functions (:issue:`61383`) +- Added support to read and write from and to Apache Iceberg tables with the new :func:`read_iceberg` and :meth:`DataFrame.to_iceberg` functions (:issue:`61383`) - Errors occurring during SQL I/O will now throw a generic :class:`.DatabaseError` instead of the raw Exception type from the underlying driver manager library (:issue:`60748`) - Implemented :meth:`Series.str.isascii` and :meth:`Series.str.isascii` (:issue:`59091`) - Improved deprecation message for offset aliases (:issue:`60820`) From a83c62fdbfeea52c438d61711c7ce53f9a41f019 Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Wed, 28 May 2025 00:11:10 +0200 Subject: [PATCH 04/10] Fix CI and app test --- pandas/core/frame.py | 6 +++++- pandas/io/iceberg.py | 15 ++++++++------- pandas/tests/io/test_iceberg.py | 22 ++++++++++++++++++++++ 3 files changed, 35 insertions(+), 8 deletions(-) diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 68ce3baa959b3..c7a2bd9860453 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -3574,6 +3574,10 @@ def to_iceberg( snapshot_properties : dict of {str: str}, optional Custom properties to be added to the snapshot summary + Returns + ------- + None + See Also -------- read_iceberg : Read an Apache Iceberg table. @@ -3582,7 +3586,7 @@ def to_iceberg( Examples -------- >>> df = pd.DataFrame(data={"col1": [1, 2], "col2": [4, 3]}) - >>> df.to_iceberg("my_table", catalog_name="my_catalog") + >>> df.to_iceberg("my_table", catalog_name="my_catalog") # doctest: +SKIP """ from pandas.io.iceberg import to_iceberg diff --git a/pandas/io/iceberg.py b/pandas/io/iceberg.py index f0c2e9516dc30..1da541097c426 100644 --- a/pandas/io/iceberg.py +++ b/pandas/io/iceberg.py @@ -1,4 +1,5 @@ from typing import ( + TYPE_CHECKING, Any, ) @@ -6,8 +7,13 @@ from pandas import DataFrame +if TYPE_CHECKING: + from pyiceberg.catalog import Catalog # noqa: TC004 -def _get_catalog(catalog_name: str | None, catalog_properties: dict[str, Any] | None): + +def _get_catalog( + catalog_name: str | None, catalog_properties: dict[str, Any] | None +) -> Catalog: pyiceberg_catalog = import_optional_dependency("pyiceberg.catalog") if catalog_properties is None: catalog_properties = {} @@ -105,7 +111,7 @@ def to_iceberg( catalog_properties: dict[str, Any] | None = None, location: str | None = None, snapshot_properties: dict[str, str] | None = None, -): +) -> None: """ Write a DataFrame to an Apache Iceberg table. @@ -128,11 +134,6 @@ def to_iceberg( -------- read_iceberg : Read an Apache Iceberg table. DataFrame.to_parquet : Write a DataFrame in Parquet format. - - Examples - -------- - >>> df = pd.DataFrame(data={"col1": [1, 2], "col2": [4, 3]}) - >>> df.to_iceberg("my_table", catalog_name="my_catalog") """ pa = import_optional_dependency("pyarrow") diff --git a/pandas/tests/io/test_iceberg.py b/pandas/tests/io/test_iceberg.py index c0597c6f79009..937696bcc98fc 100644 --- a/pandas/tests/io/test_iceberg.py +++ b/pandas/tests/io/test_iceberg.py @@ -177,3 +177,25 @@ def test_write_by_catalog_name(self, catalog): catalog_name=catalog.name, ) tm.assert_frame_equal(result, df) + + def test_write_existing_table(self, catalog): + original = read_iceberg( + "ns.my_table", + catalog_properties={"uri": catalog.uri}, + ) + new = pd.DataFrame( + { + "A": [4, 5], + "B": ["bar", "foobar"], + } + ) + new.to_iceberg( + "ns.my_table", + catalog_properties={"uri": catalog.uri}, + location=catalog.warehouse, + ) + result = read_iceberg( + "ns.my_table", + catalog_properties={"uri": catalog.uri}, + ) + tm.assert_frame_equal(result, pd.concat([original, new])) From 4246d177f179f5247b8b172b2acfa69559cb98cd Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Wed, 28 May 2025 00:30:16 +0200 Subject: [PATCH 05/10] Fix CI, fix existing table test --- pandas/io/iceberg.py | 4 ++-- pandas/tests/io/test_iceberg.py | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pandas/io/iceberg.py b/pandas/io/iceberg.py index 1da541097c426..89a66fd5127e3 100644 --- a/pandas/io/iceberg.py +++ b/pandas/io/iceberg.py @@ -8,12 +8,12 @@ from pandas import DataFrame if TYPE_CHECKING: - from pyiceberg.catalog import Catalog # noqa: TC004 + from pyiceberg.catalog import Catalog def _get_catalog( catalog_name: str | None, catalog_properties: dict[str, Any] | None -) -> Catalog: +) -> "Catalog": pyiceberg_catalog = import_optional_dependency("pyiceberg.catalog") if catalog_properties is None: catalog_properties = {} diff --git a/pandas/tests/io/test_iceberg.py b/pandas/tests/io/test_iceberg.py index 937696bcc98fc..7c73f8a25c0d7 100644 --- a/pandas/tests/io/test_iceberg.py +++ b/pandas/tests/io/test_iceberg.py @@ -185,10 +185,11 @@ def test_write_existing_table(self, catalog): ) new = pd.DataFrame( { - "A": [4, 5], - "B": ["bar", "foobar"], + "A": [1, 2, 3], + "B": ["foo", "foo", "foo"], } ) + expected = pd.concat([original, new], ignore_index=True) new.to_iceberg( "ns.my_table", catalog_properties={"uri": catalog.uri}, @@ -198,4 +199,4 @@ def test_write_existing_table(self, catalog): "ns.my_table", catalog_properties={"uri": catalog.uri}, ) - tm.assert_frame_equal(result, pd.concat([original, new])) + tm.assert_frame_equal(result, expected) From fd728e01163fbeedfd2851b326ceef82769f15ee Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Wed, 28 May 2025 01:04:49 +0200 Subject: [PATCH 06/10] typing of to_iceberg --- pandas/core/frame.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pandas/core/frame.py b/pandas/core/frame.py index c7a2bd9860453..73f747259d170 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -3555,7 +3555,7 @@ def to_iceberg( catalog_properties: dict[str, Any] | None = None, location: str | None = None, snapshot_properties: dict[str, str] | None = None, - ): + ) -> None: """ Write a DataFrame to an Apache Iceberg table. @@ -3574,10 +3574,6 @@ def to_iceberg( snapshot_properties : dict of {str: str}, optional Custom properties to be added to the snapshot summary - Returns - ------- - None - See Also -------- read_iceberg : Read an Apache Iceberg table. From f926ffd048d57f93aeb2513e375c2dc175adf4f1 Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Wed, 28 May 2025 13:23:22 +0200 Subject: [PATCH 07/10] Fix docstring validation error --- pandas/core/frame.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 73f747259d170..70b492f6d05cc 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -3586,7 +3586,7 @@ def to_iceberg( """ from pandas.io.iceberg import to_iceberg - return to_iceberg( + to_iceberg( self, table_identifier, catalog_name, From 1290931e834460221fd747d0ee7488816b25c616 Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Wed, 28 May 2025 13:31:44 +0200 Subject: [PATCH 08/10] Simplifying getting the catalog, to make linting easier --- pandas/io/iceberg.py | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/pandas/io/iceberg.py b/pandas/io/iceberg.py index 89a66fd5127e3..31032a1ad84be 100644 --- a/pandas/io/iceberg.py +++ b/pandas/io/iceberg.py @@ -1,5 +1,4 @@ from typing import ( - TYPE_CHECKING, Any, ) @@ -7,18 +6,6 @@ from pandas import DataFrame -if TYPE_CHECKING: - from pyiceberg.catalog import Catalog - - -def _get_catalog( - catalog_name: str | None, catalog_properties: dict[str, Any] | None -) -> "Catalog": - pyiceberg_catalog = import_optional_dependency("pyiceberg.catalog") - if catalog_properties is None: - catalog_properties = {} - return pyiceberg_catalog.load_catalog(catalog_name, **catalog_properties) - def read_iceberg( table_identifier: str, @@ -83,8 +70,11 @@ def read_iceberg( ... selected_fields=("VendorID", "tpep_pickup_datetime"), ... ) # doctest: +SKIP """ - catalog = _get_catalog(catalog_name, catalog_properties) + pyiceberg_catalog = import_optional_dependency("pyiceberg.catalog") pyiceberg_expressions = import_optional_dependency("pyiceberg.expressions") + if catalog_properties is None: + catalog_properties = {} + catalog = pyiceberg_catalog.load_catalog(catalog_name, **catalog_properties) table = catalog.load_table(table_identifier) if row_filter is None: row_filter = pyiceberg_expressions.AlwaysTrue() @@ -136,8 +126,10 @@ def to_iceberg( DataFrame.to_parquet : Write a DataFrame in Parquet format. """ pa = import_optional_dependency("pyarrow") - - catalog = _get_catalog(catalog_name, catalog_properties) + pyiceberg_catalog = import_optional_dependency("pyiceberg.catalog") + if catalog_properties is None: + catalog_properties = {} + catalog = pyiceberg_catalog.load_catalog(catalog_name, **catalog_properties) arrow_table = pa.Table.from_pandas(df) table = catalog.create_table_if_not_exists( identifier=table_identifier, From 43255a729174ea1eb0d47b2415569b534ec6edec Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Mon, 2 Jun 2025 23:32:37 +0400 Subject: [PATCH 09/10] Adding experimental note, fixing typos and adding user guide docs --- doc/source/user_guide/io.rst | 25 ++++++++++++++++++++++++- pandas/core/frame.py | 4 ++++ pandas/io/iceberg.py | 2 ++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/doc/source/user_guide/io.rst b/doc/source/user_guide/io.rst index 3e09027f7ada8..5a7dc36831ca1 100644 --- a/doc/source/user_guide/io.rst +++ b/doc/source/user_guide/io.rst @@ -5417,7 +5417,7 @@ engines to safely work with the same tables at the same time. Iceberg support predicate pushdown and column pruning, which are available to pandas users via the ``row_filter`` and ``selected_fields`` parameters of the :func:`~pandas.read_iceberg` -function. This is convenient to extract from large tables a subset that fits in memory asa +function. This is convenient to extract from large tables a subset that fits in memory as a pandas ``DataFrame``. Internally, pandas uses PyIceberg_ to query Iceberg. @@ -5497,6 +5497,29 @@ parameter: Reading a particular snapshot is also possible providing the snapshot ID as an argument to ``snapshot_id``. +To save a ``DataFrame`` to Iceberg, it can be done with the :meth:`DataFrame.to_iceberg` +method: + +.. code-block:: python + + df.to_iceberg("my_table", catalog_name="my_catalog") + +To specify the catalog, it works in the same way as for :func:`read_iceberg` with the +``catalog_name`` and ``catalog_properties`` parameters. + +The location of the table can be specified with the ``location`` parameter: + +.. code-block:: python + + df.to_iceberg( + "my_table", + catalog_name="my_catalog", + location="s://my-data-lake/my-iceberg-tables", + ) + +It is possible to add properties to the table snapshot by passing a dictionary to the +``snapshot_properties`` parameter. + More information about the Iceberg format can be found in the `Apache Iceberg official page `__. diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 70b492f6d05cc..2ad1029ee9bf1 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -3561,6 +3561,10 @@ def to_iceberg( .. versionadded:: 3.0.0 + .. warning:: + + to_iceberg is experimental and may change without warning. + Parameters ---------- table_identifier : str diff --git a/pandas/io/iceberg.py b/pandas/io/iceberg.py index 31032a1ad84be..693355e49325d 100644 --- a/pandas/io/iceberg.py +++ b/pandas/io/iceberg.py @@ -22,6 +22,8 @@ def read_iceberg( """ Read an Apache Iceberg table into a pandas DataFrame. + .. versionadded:: 3.0.0 + .. warning:: read_iceberg is experimental and may change without warning. From 21e6e9bc6005a5aacdb7ec33281e3fdb84d51633 Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Wed, 4 Jun 2025 14:10:33 +0400 Subject: [PATCH 10/10] Add append parameter --- pandas/core/frame.py | 4 ++++ pandas/io/iceberg.py | 8 +++++++- pandas/tests/io/test_iceberg.py | 22 +++++++++++++++++++++- 3 files changed, 32 insertions(+), 2 deletions(-) diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 229e8a7d251c7..8053c17437c5e 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -3554,6 +3554,7 @@ def to_iceberg( *, catalog_properties: dict[str, Any] | None = None, location: str | None = None, + append: bool = False, snapshot_properties: dict[str, str] | None = None, ) -> None: """ @@ -3575,6 +3576,8 @@ def to_iceberg( The properties that are used next to the catalog configuration. location : str, optional Location for the table. + append : bool, default False + If ``True``, append data to the table, instead of replacing the content. snapshot_properties : dict of {str: str}, optional Custom properties to be added to the snapshot summary @@ -3596,6 +3599,7 @@ def to_iceberg( catalog_name, catalog_properties=catalog_properties, location=location, + append=append, snapshot_properties=snapshot_properties, ) diff --git a/pandas/io/iceberg.py b/pandas/io/iceberg.py index 693355e49325d..dcb675271031e 100644 --- a/pandas/io/iceberg.py +++ b/pandas/io/iceberg.py @@ -102,6 +102,7 @@ def to_iceberg( *, catalog_properties: dict[str, Any] | None = None, location: str | None = None, + append: bool = False, snapshot_properties: dict[str, str] | None = None, ) -> None: """ @@ -119,6 +120,8 @@ def to_iceberg( The properties that are used next to the catalog configuration. location : str, optional Location for the table. + append : bool, default False + If ``True``, append data to the table, instead of replacing the content. snapshot_properties : dict of {str: str}, optional Custom properties to be added to the snapshot summary @@ -142,4 +145,7 @@ def to_iceberg( ) if snapshot_properties is None: snapshot_properties = {} - table.append(arrow_table, snapshot_properties=snapshot_properties) + if append: + table.append(arrow_table, snapshot_properties=snapshot_properties) + else: + table.overwrite(arrow_table, snapshot_properties=snapshot_properties) diff --git a/pandas/tests/io/test_iceberg.py b/pandas/tests/io/test_iceberg.py index 7c73f8a25c0d7..916c1d2af9b12 100644 --- a/pandas/tests/io/test_iceberg.py +++ b/pandas/tests/io/test_iceberg.py @@ -178,7 +178,7 @@ def test_write_by_catalog_name(self, catalog): ) tm.assert_frame_equal(result, df) - def test_write_existing_table(self, catalog): + def test_write_existing_table_with_append_true(self, catalog): original = read_iceberg( "ns.my_table", catalog_properties={"uri": catalog.uri}, @@ -194,9 +194,29 @@ def test_write_existing_table(self, catalog): "ns.my_table", catalog_properties={"uri": catalog.uri}, location=catalog.warehouse, + append=True, ) result = read_iceberg( "ns.my_table", catalog_properties={"uri": catalog.uri}, ) tm.assert_frame_equal(result, expected) + + def test_write_existing_table_with_append_false(self, catalog): + df = pd.DataFrame( + { + "A": [1, 2, 3], + "B": ["foo", "foo", "foo"], + } + ) + df.to_iceberg( + "ns.my_table", + catalog_properties={"uri": catalog.uri}, + location=catalog.warehouse, + append=False, + ) + result = read_iceberg( + "ns.my_table", + catalog_properties={"uri": catalog.uri}, + ) + tm.assert_frame_equal(result, df)