From 48d9edc18577fcca55afb4bc6a71f7436fa649a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Jos=C3=A9=20Pereira=20Vieito?= <pvieito@gmail.com> Date: Thu, 16 Jan 2025 17:48:45 +0100 Subject: [PATCH 01/18] Update _write.py --- awswrangler/s3/_write.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/awswrangler/s3/_write.py b/awswrangler/s3/_write.py index 80840ce05..16ea2b3ad 100644 --- a/awswrangler/s3/_write.py +++ b/awswrangler/s3/_write.py @@ -277,9 +277,20 @@ def write( # noqa: PLR0913 partition_cols = partition_cols if partition_cols else [] dtype = dtype if dtype else {} partitions_values: dict[str, list[str]] = {} - mode = "append" if mode is None else mode - filename_prefix = filename_prefix + uuid.uuid4().hex if filename_prefix else uuid.uuid4().hex + if mode == "overwrite_files": + if filename_prefix is None: + filename_prefix = "part" + random_filename_suffix = "" + mode = "append" + else: + random_filename_suffix = uuid.uuid4().hex + + if filename_prefix is None: + filename_prefix = "" + filename_prefix = filename_prefix + random_filename_suffix + + mode = "append" if mode is None else mode cpus: int = _utils.ensure_cpu_count(use_threads=use_threads) s3_client = _utils.client(service_name="s3", session=boto3_session) From e23e1a83974bc10af396f55cf1e976d158db12a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Jose=CC=81=20Pereira=20Vieito?= <pvieito@gmail.com> Date: Thu, 16 Jan 2025 17:58:01 +0100 Subject: [PATCH 02/18] =?UTF-8?q?Initial=20implementation=20of=20=E2=80=9C?= =?UTF-8?q?overwrite=5Ffiles=E2=80=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- awswrangler/s3/_write.py | 2 ++ awswrangler/s3/_write_orc.py | 2 +- awswrangler/s3/_write_parquet.py | 2 +- awswrangler/s3/_write_text.py | 4 ++-- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/awswrangler/s3/_write.py b/awswrangler/s3/_write.py index 16ea2b3ad..e960bc0b4 100644 --- a/awswrangler/s3/_write.py +++ b/awswrangler/s3/_write.py @@ -279,6 +279,8 @@ def write( # noqa: PLR0913 partitions_values: dict[str, list[str]] = {} if mode == "overwrite_files": + assert max_rows_by_file in [None, 0] + if filename_prefix is None: filename_prefix = "part" random_filename_suffix = "" diff --git a/awswrangler/s3/_write_orc.py b/awswrangler/s3/_write_orc.py index f3038a2e3..84c63e578 100644 --- a/awswrangler/s3/_write_orc.py +++ b/awswrangler/s3/_write_orc.py @@ -326,7 +326,7 @@ def to_orc( partition_cols: list[str] | None = None, bucketing_info: BucketingInfoTuple | None = None, concurrent_partitioning: bool = False, - mode: Literal["append", "overwrite", "overwrite_partitions"] | None = None, + mode: Literal["append", "overwrite", "overwrite_partitions", "overwrite_files"] | None = None, catalog_versioning: bool = False, schema_evolution: bool = True, database: str | None = None, diff --git a/awswrangler/s3/_write_parquet.py b/awswrangler/s3/_write_parquet.py index d30fc6d60..944b78e7f 100644 --- a/awswrangler/s3/_write_parquet.py +++ b/awswrangler/s3/_write_parquet.py @@ -353,7 +353,7 @@ def to_parquet( partition_cols: list[str] | None = None, bucketing_info: BucketingInfoTuple | None = None, concurrent_partitioning: bool = False, - mode: Literal["append", "overwrite", "overwrite_partitions"] | None = None, + mode: Literal["append", "overwrite", "overwrite_partitions", "overwrite_files"] | None = None, catalog_versioning: bool = False, schema_evolution: bool = True, database: str | None = None, diff --git a/awswrangler/s3/_write_text.py b/awswrangler/s3/_write_text.py index 4911e7696..b954ab6be 100644 --- a/awswrangler/s3/_write_text.py +++ b/awswrangler/s3/_write_text.py @@ -98,7 +98,7 @@ def to_csv( # noqa: PLR0912,PLR0915 partition_cols: list[str] | None = None, bucketing_info: BucketingInfoTuple | None = None, concurrent_partitioning: bool = False, - mode: Literal["append", "overwrite", "overwrite_partitions"] | None = None, + mode: Literal["append", "overwrite", "overwrite_partitions", "overwrite_files"] | None = None, catalog_versioning: bool = False, schema_evolution: bool = False, dtype: dict[str, str] | None = None, @@ -661,7 +661,7 @@ def to_json( # noqa: PLR0912,PLR0915 partition_cols: list[str] | None = None, bucketing_info: BucketingInfoTuple | None = None, concurrent_partitioning: bool = False, - mode: Literal["append", "overwrite", "overwrite_partitions"] | None = None, + mode: Literal["append", "overwrite", "overwrite_partitions", "overwrite_files"] | None = None, catalog_versioning: bool = False, schema_evolution: bool = True, dtype: dict[str, str] | None = None, From cb7fb672098dc938431a958917d50a45d70d24cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Jose=CC=81=20Pereira=20Vieito?= <pvieito@gmail.com> Date: Fri, 17 Jan 2025 10:17:38 +0100 Subject: [PATCH 03/18] =?UTF-8?q?Updated=20implementation=20of=20=E2=80=9C?= =?UTF-8?q?overwrite=5Ffiles=E2=80=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- awswrangler/s3/_write.py | 38 +++++++++++++++++++------------- awswrangler/s3/_write_orc.py | 1 + awswrangler/s3/_write_parquet.py | 1 + 3 files changed, 25 insertions(+), 15 deletions(-) diff --git a/awswrangler/s3/_write.py b/awswrangler/s3/_write.py index e960bc0b4..5165efc4c 100644 --- a/awswrangler/s3/_write.py +++ b/awswrangler/s3/_write.py @@ -6,7 +6,7 @@ import uuid from abc import ABC, abstractmethod from enum import Enum -from typing import TYPE_CHECKING, Any, Callable, NamedTuple +from typing import TYPE_CHECKING, Any, Callable, NamedTuple, Tuple import boto3 import pandas as pd @@ -35,6 +35,21 @@ } +def _compose_filename_prefix_for_mode(*, mode: str, filename_prefix: str = None) -> Tuple[str, str]: + if mode == "overwrite_files": + if filename_prefix is None: + filename_prefix = "part" + random_filename_suffix = "" + mode = "append" + else: + random_filename_suffix = uuid.uuid4().hex + + if filename_prefix is None: + filename_prefix = "" + filename_prefix = filename_prefix + random_filename_suffix + return filename_prefix, mode + + def _extract_dtypes_from_table_input(table_input: dict[str, Any]) -> dict[str, str]: dtypes: dict[str, str] = {} for col in table_input["StorageDescriptor"]["Columns"]: @@ -71,6 +86,7 @@ def _validate_args( parameters: dict[str, str] | None, columns_comments: dict[str, str] | None, columns_parameters: dict[str, dict[str, str]] | None, + max_rows_by_file: int | None, execution_engine: Enum, ) -> None: if df.empty is True: @@ -88,6 +104,10 @@ def _validate_args( raise exceptions.InvalidArgumentCombination("Please, pass dataset=True to be able to use bucketing_info.") if mode is not None: raise exceptions.InvalidArgumentCombination("Please pass dataset=True to be able to use mode.") + if mode == "overwrite_files" and (max_rows_by_file or bucketing_info): + raise exceptions.InvalidArgumentValue( + "When mode is set to 'overwrite_files', the " + "`max_rows_by_file` and `bucketing_info` arguments cannot be set.") if any(arg is not None for arg in (table, description, parameters, columns_comments, columns_parameters)): raise exceptions.InvalidArgumentCombination( "Please pass dataset=True to be able to use any one of these " @@ -278,20 +298,8 @@ def write( # noqa: PLR0913 dtype = dtype if dtype else {} partitions_values: dict[str, list[str]] = {} - if mode == "overwrite_files": - assert max_rows_by_file in [None, 0] - - if filename_prefix is None: - filename_prefix = "part" - random_filename_suffix = "" - mode = "append" - else: - random_filename_suffix = uuid.uuid4().hex - - if filename_prefix is None: - filename_prefix = "" - filename_prefix = filename_prefix + random_filename_suffix - + mode, filename_prefix = _compose_filename_prefix_for_mode( + mode=mode, filename_prefix=filename_prefix) mode = "append" if mode is None else mode cpus: int = _utils.ensure_cpu_count(use_threads=use_threads) s3_client = _utils.client(service_name="s3", session=boto3_session) diff --git a/awswrangler/s3/_write_orc.py b/awswrangler/s3/_write_orc.py index 84c63e578..94b76bef3 100644 --- a/awswrangler/s3/_write_orc.py +++ b/awswrangler/s3/_write_orc.py @@ -645,6 +645,7 @@ def to_orc( parameters=parameters, columns_comments=columns_comments, columns_parameters=columns_parameters, + max_rows_by_file=max_rows_by_file, execution_engine=engine.get(), ) diff --git a/awswrangler/s3/_write_parquet.py b/awswrangler/s3/_write_parquet.py index 944b78e7f..5f19ca258 100644 --- a/awswrangler/s3/_write_parquet.py +++ b/awswrangler/s3/_write_parquet.py @@ -703,6 +703,7 @@ def to_parquet( parameters=parameters, columns_comments=columns_comments, columns_parameters=columns_parameters, + max_rows_by_file=max_rows_by_file, execution_engine=engine.get(), ) From 1b6f89208fba30e73c448a510b0ea7c8be90d9c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Jose=CC=81=20Pereira=20Vieito?= <pvieito@gmail.com> Date: Fri, 17 Jan 2025 10:18:10 +0100 Subject: [PATCH 04/18] =?UTF-8?q?Updated=20implementation=20of=20=E2=80=9C?= =?UTF-8?q?overwrite=5Ffiles=E2=80=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- awswrangler/s3/_write.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awswrangler/s3/_write.py b/awswrangler/s3/_write.py index 5165efc4c..6532b2251 100644 --- a/awswrangler/s3/_write.py +++ b/awswrangler/s3/_write.py @@ -47,7 +47,7 @@ def _compose_filename_prefix_for_mode(*, mode: str, filename_prefix: str = None) if filename_prefix is None: filename_prefix = "" filename_prefix = filename_prefix + random_filename_suffix - return filename_prefix, mode + return mode, filename_prefix def _extract_dtypes_from_table_input(table_input: dict[str, Any]) -> dict[str, str]: From 10e30c11186107839470c3b1f01e075ea132e0de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Jose=CC=81=20Pereira=20Vieito?= <pvieito@gmail.com> Date: Fri, 17 Jan 2025 10:25:59 +0100 Subject: [PATCH 05/18] =?UTF-8?q?Updated=20implementation=20of=20=E2=80=9C?= =?UTF-8?q?overwrite=5Ffiles=E2=80=9D=20mode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- awswrangler/s3/_write.py | 2 +- awswrangler/s3/_write_orc.py | 2 +- awswrangler/s3/_write_parquet.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/awswrangler/s3/_write.py b/awswrangler/s3/_write.py index 6532b2251..39682455f 100644 --- a/awswrangler/s3/_write.py +++ b/awswrangler/s3/_write.py @@ -86,8 +86,8 @@ def _validate_args( parameters: dict[str, str] | None, columns_comments: dict[str, str] | None, columns_parameters: dict[str, dict[str, str]] | None, - max_rows_by_file: int | None, execution_engine: Enum, + max_rows_by_file: int | None = None, ) -> None: if df.empty is True: _logger.warning("Empty DataFrame will be written.") diff --git a/awswrangler/s3/_write_orc.py b/awswrangler/s3/_write_orc.py index 94b76bef3..7d1061874 100644 --- a/awswrangler/s3/_write_orc.py +++ b/awswrangler/s3/_write_orc.py @@ -645,8 +645,8 @@ def to_orc( parameters=parameters, columns_comments=columns_comments, columns_parameters=columns_parameters, - max_rows_by_file=max_rows_by_file, execution_engine=engine.get(), + max_rows_by_file=max_rows_by_file, ) # Evaluating compression diff --git a/awswrangler/s3/_write_parquet.py b/awswrangler/s3/_write_parquet.py index 5f19ca258..835d2ebfd 100644 --- a/awswrangler/s3/_write_parquet.py +++ b/awswrangler/s3/_write_parquet.py @@ -703,8 +703,8 @@ def to_parquet( parameters=parameters, columns_comments=columns_comments, columns_parameters=columns_parameters, - max_rows_by_file=max_rows_by_file, execution_engine=engine.get(), + max_rows_by_file=max_rows_by_file, ) # Evaluating compression From 2f31d7d66ca1fcddcf0e4041e0d6c7c317d727b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Jose=CC=81=20Pereira=20Vieito?= <pvieito@gmail.com> Date: Fri, 17 Jan 2025 10:30:39 +0100 Subject: [PATCH 06/18] =?UTF-8?q?Updated=20implementation=20of=20=E2=80=9C?= =?UTF-8?q?overwrite=5Ffiles=E2=80=9D=20mode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- awswrangler/s3/_write.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/awswrangler/s3/_write.py b/awswrangler/s3/_write.py index 39682455f..6cc120da8 100644 --- a/awswrangler/s3/_write.py +++ b/awswrangler/s3/_write.py @@ -107,7 +107,8 @@ def _validate_args( if mode == "overwrite_files" and (max_rows_by_file or bucketing_info): raise exceptions.InvalidArgumentValue( "When mode is set to 'overwrite_files', the " - "`max_rows_by_file` and `bucketing_info` arguments cannot be set.") + "`max_rows_by_file` and `bucketing_info` arguments cannot be set." + ) if any(arg is not None for arg in (table, description, parameters, columns_comments, columns_parameters)): raise exceptions.InvalidArgumentCombination( "Please pass dataset=True to be able to use any one of these " @@ -298,8 +299,7 @@ def write( # noqa: PLR0913 dtype = dtype if dtype else {} partitions_values: dict[str, list[str]] = {} - mode, filename_prefix = _compose_filename_prefix_for_mode( - mode=mode, filename_prefix=filename_prefix) + mode, filename_prefix = _compose_filename_prefix_for_mode(mode=mode, filename_prefix=filename_prefix) mode = "append" if mode is None else mode cpus: int = _utils.ensure_cpu_count(use_threads=use_threads) s3_client = _utils.client(service_name="s3", session=boto3_session) From 83d7683029681baaded898338d9c6417ab6553ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Jose=CC=81=20Pereira=20Vieito?= <pvieito@gmail.com> Date: Fri, 17 Jan 2025 10:32:54 +0100 Subject: [PATCH 07/18] =?UTF-8?q?Updated=20implementation=20of=20=E2=80=9C?= =?UTF-8?q?overwrite=5Ffiles=E2=80=9D=20mode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- awswrangler/s3/_write.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/awswrangler/s3/_write.py b/awswrangler/s3/_write.py index 6cc120da8..40a467fc5 100644 --- a/awswrangler/s3/_write.py +++ b/awswrangler/s3/_write.py @@ -6,7 +6,7 @@ import uuid from abc import ABC, abstractmethod from enum import Enum -from typing import TYPE_CHECKING, Any, Callable, NamedTuple, Tuple +from typing import TYPE_CHECKING, Any, Callable, NamedTuple import boto3 import pandas as pd @@ -35,7 +35,7 @@ } -def _compose_filename_prefix_for_mode(*, mode: str, filename_prefix: str = None) -> Tuple[str, str]: +def _compose_filename_prefix_for_mode(*, mode: str, filename_prefix: str = None) -> tuple[str, str]: if mode == "overwrite_files": if filename_prefix is None: filename_prefix = "part" From 4b9a534fe7a921d2bc12cb564426d031edae8e43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Jose=CC=81=20Pereira=20Vieito?= <pvieito@gmail.com> Date: Fri, 17 Jan 2025 10:40:23 +0100 Subject: [PATCH 08/18] =?UTF-8?q?Updated=20implementation=20of=20=E2=80=9C?= =?UTF-8?q?overwrite=5Ffiles=E2=80=9D=20mode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- awswrangler/s3/_write.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/awswrangler/s3/_write.py b/awswrangler/s3/_write.py index 40a467fc5..46195ea89 100644 --- a/awswrangler/s3/_write.py +++ b/awswrangler/s3/_write.py @@ -35,7 +35,11 @@ } -def _compose_filename_prefix_for_mode(*, mode: str, filename_prefix: str = None) -> tuple[str, str]: +def _load_mode_and_filename_prefix( + *, mode: str | None, filename_prefix: str | None = None) -> tuple[str, str]: + if mode is None: + mode = "append" + if mode == "overwrite_files": if filename_prefix is None: filename_prefix = "part" @@ -299,8 +303,7 @@ def write( # noqa: PLR0913 dtype = dtype if dtype else {} partitions_values: dict[str, list[str]] = {} - mode, filename_prefix = _compose_filename_prefix_for_mode(mode=mode, filename_prefix=filename_prefix) - mode = "append" if mode is None else mode + mode, filename_prefix = _load_mode_and_filename_prefix(mode=mode, filename_prefix=filename_prefix) cpus: int = _utils.ensure_cpu_count(use_threads=use_threads) s3_client = _utils.client(service_name="s3", session=boto3_session) From 3206a2b8032798e553c21ebb545275d29ea52010 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Jose=CC=81=20Pereira=20Vieito?= <pvieito@gmail.com> Date: Fri, 17 Jan 2025 10:41:53 +0100 Subject: [PATCH 09/18] =?UTF-8?q?Updated=20implementation=20of=20=E2=80=9C?= =?UTF-8?q?overwrite=5Ffiles=E2=80=9D=20mode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- awswrangler/s3/_write.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/awswrangler/s3/_write.py b/awswrangler/s3/_write.py index 46195ea89..aafca1d7b 100644 --- a/awswrangler/s3/_write.py +++ b/awswrangler/s3/_write.py @@ -35,8 +35,7 @@ } -def _load_mode_and_filename_prefix( - *, mode: str | None, filename_prefix: str | None = None) -> tuple[str, str]: +def _load_mode_and_filename_prefix(*, mode: str | None, filename_prefix: str | None = None) -> tuple[str, str]: if mode is None: mode = "append" From 2d00362732ee738e6c0428c2fb628ef7df8b8f14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Jose=CC=81=20Pereira=20Vieito?= <pvieito@gmail.com> Date: Fri, 17 Jan 2025 11:25:14 +0100 Subject: [PATCH 10/18] =?UTF-8?q?Updated=20implementation=20of=20=E2=80=9C?= =?UTF-8?q?overwrite=5Ffiles=E2=80=9D=20mode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- awswrangler/s3/_write.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/awswrangler/s3/_write.py b/awswrangler/s3/_write.py index aafca1d7b..d879fe781 100644 --- a/awswrangler/s3/_write.py +++ b/awswrangler/s3/_write.py @@ -107,11 +107,6 @@ def _validate_args( raise exceptions.InvalidArgumentCombination("Please, pass dataset=True to be able to use bucketing_info.") if mode is not None: raise exceptions.InvalidArgumentCombination("Please pass dataset=True to be able to use mode.") - if mode == "overwrite_files" and (max_rows_by_file or bucketing_info): - raise exceptions.InvalidArgumentValue( - "When mode is set to 'overwrite_files', the " - "`max_rows_by_file` and `bucketing_info` arguments cannot be set." - ) if any(arg is not None for arg in (table, description, parameters, columns_comments, columns_parameters)): raise exceptions.InvalidArgumentCombination( "Please pass dataset=True to be able to use any one of these " @@ -131,6 +126,11 @@ def _validate_args( raise exceptions.InvalidArgumentValue( "Please pass a value greater than 1 for the number of buckets for bucketing." ) + elif mode == "overwrite_files" and (max_rows_by_file or bucketing_info): + raise exceptions.InvalidArgumentValue( + "When mode is set to 'overwrite_files', the " + "`max_rows_by_file` and `bucketing_info` arguments cannot be set." + ) class _SanitizeResult(NamedTuple): From f86b8488cd81831ad050afd6d74cd4f03acce1bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Jose=CC=81=20Pereira=20Vieito?= <pvieito@gmail.com> Date: Fri, 17 Jan 2025 12:00:16 +0100 Subject: [PATCH 11/18] =?UTF-8?q?Updated=20implementation=20of=20=E2=80=9C?= =?UTF-8?q?overwrite=5Ffiles=E2=80=9D=20mode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- awswrangler/s3/_write.py | 21 +-------------------- awswrangler/s3/_write_dataset.py | 20 ++++++++++++++++++++ awswrangler/s3/_write_text.py | 2 -- 3 files changed, 21 insertions(+), 22 deletions(-) diff --git a/awswrangler/s3/_write.py b/awswrangler/s3/_write.py index d879fe781..61864806e 100644 --- a/awswrangler/s3/_write.py +++ b/awswrangler/s3/_write.py @@ -35,24 +35,6 @@ } -def _load_mode_and_filename_prefix(*, mode: str | None, filename_prefix: str | None = None) -> tuple[str, str]: - if mode is None: - mode = "append" - - if mode == "overwrite_files": - if filename_prefix is None: - filename_prefix = "part" - random_filename_suffix = "" - mode = "append" - else: - random_filename_suffix = uuid.uuid4().hex - - if filename_prefix is None: - filename_prefix = "" - filename_prefix = filename_prefix + random_filename_suffix - return mode, filename_prefix - - def _extract_dtypes_from_table_input(table_input: dict[str, Any]) -> dict[str, str]: dtypes: dict[str, str] = {} for col in table_input["StorageDescriptor"]["Columns"]: @@ -301,8 +283,8 @@ def write( # noqa: PLR0913 partition_cols = partition_cols if partition_cols else [] dtype = dtype if dtype else {} partitions_values: dict[str, list[str]] = {} + mode = "append" if mode is None else mode - mode, filename_prefix = _load_mode_and_filename_prefix(mode=mode, filename_prefix=filename_prefix) cpus: int = _utils.ensure_cpu_count(use_threads=use_threads) s3_client = _utils.client(service_name="s3", session=boto3_session) @@ -351,7 +333,6 @@ def write( # noqa: PLR0913 paths = self._write_to_s3( df, path=path, - filename_prefix=filename_prefix, schema=schema, index=index, cpus=cpus, diff --git a/awswrangler/s3/_write_dataset.py b/awswrangler/s3/_write_dataset.py index c6d78a90a..a7ca28378 100644 --- a/awswrangler/s3/_write_dataset.py +++ b/awswrangler/s3/_write_dataset.py @@ -3,6 +3,7 @@ from __future__ import annotations import logging +import uuid from typing import Any, Callable import boto3 @@ -18,6 +19,24 @@ _logger: logging.Logger = logging.getLogger(__name__) +def _load_mode_and_filename_prefix(*, mode: str | None, filename_prefix: str | None = None) -> tuple[str, str]: + if mode is None: + mode = "append" + + if mode == "overwrite_files": + if filename_prefix is None: + filename_prefix = "part" + random_filename_suffix = "" + mode = "append" + else: + random_filename_suffix = uuid.uuid4().hex + + if filename_prefix is None: + filename_prefix = "" + filename_prefix = filename_prefix + random_filename_suffix + return mode, filename_prefix + + def _get_bucketing_series(df: pd.DataFrame, bucketing_info: typing.BucketingInfoTuple) -> pd.Series: bucket_number_series = ( df[bucketing_info[0]] @@ -212,6 +231,7 @@ def _to_dataset( ) -> tuple[list[str], dict[str, list[str]]]: path_root = path_root if path_root.endswith("/") else f"{path_root}/" # Evaluate mode + mode, filename_prefix = _load_mode_and_filename_prefix(mode=mode, filename_prefix=filename_prefix) if mode not in ["append", "overwrite", "overwrite_partitions"]: raise exceptions.InvalidArgumentValue( f"{mode} is a invalid mode, please use append, overwrite or overwrite_partitions." diff --git a/awswrangler/s3/_write_text.py b/awswrangler/s3/_write_text.py index b954ab6be..800de4aab 100644 --- a/awswrangler/s3/_write_text.py +++ b/awswrangler/s3/_write_text.py @@ -469,7 +469,6 @@ def to_csv( # noqa: PLR0912,PLR0915 partitions_values: dict[str, list[str]] = {} mode = "append" if mode is None else mode - filename_prefix = filename_prefix + uuid.uuid4().hex if filename_prefix else uuid.uuid4().hex s3_client = _utils.client(service_name="s3", session=boto3_session) # Sanitize table to respect Athena's standards @@ -919,7 +918,6 @@ def to_json( # noqa: PLR0912,PLR0915 partitions_values: dict[str, list[str]] = {} mode = "append" if mode is None else mode - filename_prefix = filename_prefix + uuid.uuid4().hex if filename_prefix else uuid.uuid4().hex s3_client = _utils.client(service_name="s3", session=boto3_session) # Sanitize table to respect Athena's standards From 49df50cddf0edab2e6feaabb3407647e93c23533 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Jose=CC=81=20Pereira=20Vieito?= <pvieito@gmail.com> Date: Fri, 17 Jan 2025 12:04:46 +0100 Subject: [PATCH 12/18] =?UTF-8?q?Updated=20implementation=20of=20=E2=80=9C?= =?UTF-8?q?overwrite=5Ffiles=E2=80=9D=20mode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- awswrangler/s3/_write.py | 1 - awswrangler/s3/_write_text.py | 1 - 2 files changed, 2 deletions(-) diff --git a/awswrangler/s3/_write.py b/awswrangler/s3/_write.py index 61864806e..e49d6da54 100644 --- a/awswrangler/s3/_write.py +++ b/awswrangler/s3/_write.py @@ -3,7 +3,6 @@ from __future__ import annotations import logging -import uuid from abc import ABC, abstractmethod from enum import Enum from typing import TYPE_CHECKING, Any, Callable, NamedTuple diff --git a/awswrangler/s3/_write_text.py b/awswrangler/s3/_write_text.py index 800de4aab..eab400f60 100644 --- a/awswrangler/s3/_write_text.py +++ b/awswrangler/s3/_write_text.py @@ -4,7 +4,6 @@ import csv import logging -import uuid from typing import TYPE_CHECKING, Any, Literal, cast import boto3 From 74e51c7ce7e53ea61f6e6f0c20360d5442f3e866 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Jose=CC=81=20Pereira=20Vieito?= <pvieito@gmail.com> Date: Fri, 17 Jan 2025 12:08:38 +0100 Subject: [PATCH 13/18] =?UTF-8?q?Updated=20implementation=20of=20=E2=80=9C?= =?UTF-8?q?overwrite=5Ffiles=E2=80=9D=20mode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- awswrangler/s3/_write_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awswrangler/s3/_write_dataset.py b/awswrangler/s3/_write_dataset.py index a7ca28378..a95f9229d 100644 --- a/awswrangler/s3/_write_dataset.py +++ b/awswrangler/s3/_write_dataset.py @@ -220,7 +220,7 @@ def _to_dataset( concurrent_partitioning: bool, df: pd.DataFrame, path_root: str, - filename_prefix: str, + filename_prefix: str | None, index: bool, use_threads: bool | int, mode: str, From 2fe891db25e44fa417581d6edfbb73c6a337d68d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Jose=CC=81=20Pereira=20Vieito?= <pvieito@gmail.com> Date: Fri, 17 Jan 2025 12:28:48 +0100 Subject: [PATCH 14/18] =?UTF-8?q?Updated=20implementation=20of=20=E2=80=9C?= =?UTF-8?q?overwrite=5Ffiles=E2=80=9D=20mode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- awswrangler/_utils.py | 2 +- awswrangler/catalog/_create.py | 19 ++++++++++--------- awswrangler/s3/_write.py | 2 +- awswrangler/s3/_write_orc.py | 2 +- awswrangler/s3/_write_parquet.py | 2 +- awswrangler/s3/_write_text.py | 4 ++-- 6 files changed, 16 insertions(+), 15 deletions(-) diff --git a/awswrangler/_utils.py b/awswrangler/_utils.py index 01070ad64..28859d3d7 100644 --- a/awswrangler/_utils.py +++ b/awswrangler/_utils.py @@ -833,7 +833,7 @@ def block_waiting_available_thread(seq: Sequence[Future], max_workers: int) -> N def check_schema_changes(columns_types: dict[str, str], table_input: dict[str, Any] | None, mode: str) -> None: """Check schema changes.""" - if (table_input is not None) and (mode in ("append", "overwrite_partitions")): + if (table_input is not None) and (mode in ("append", "overwrite_partitions", "overwrite_files")): catalog_cols: dict[str, str] = {x["Name"]: x["Type"] for x in table_input["StorageDescriptor"]["Columns"]} for c, t in columns_types.items(): if c not in catalog_cols: diff --git a/awswrangler/catalog/_create.py b/awswrangler/catalog/_create.py index e81726429..09d55d7dc 100644 --- a/awswrangler/catalog/_create.py +++ b/awswrangler/catalog/_create.py @@ -31,7 +31,7 @@ def _update_if_necessary( if value is not None: if key not in dic or dic[key] != value: dic[key] = value - if mode in ("append", "overwrite_partitions"): + if mode in ("append", "overwrite_partitions", "overwrite_files"): return "update" return mode @@ -150,9 +150,10 @@ def _create_table( # noqa: PLR0912,PLR0915 client_glue = _utils.client(service_name="glue", session=boto3_session) skip_archive: bool = not catalog_versioning - if mode not in ("overwrite", "append", "overwrite_partitions", "update"): + if mode not in ("overwrite", "append", "overwrite_partitions", "overwrite_files", "update"): raise exceptions.InvalidArgument( - f"{mode} is not a valid mode. It must be 'overwrite', 'append' or 'overwrite_partitions'." + f"{mode} is not a valid mode. It must be 'overwrite', " + f"'append', 'overwrite_partitions' or 'overwrite_files'." ) args: dict[str, Any] = _catalog_id( catalog_id=catalog_id, @@ -304,7 +305,7 @@ def _create_parquet_table( _logger.debug("catalog_table_input: %s", catalog_table_input) table_input: dict[str, Any] - if (catalog_table_input is not None) and (mode in ("append", "overwrite_partitions")): + if (catalog_table_input is not None) and (mode in ("append", "overwrite_partitions", "overwrite_files")): table_input = catalog_table_input is_table_updated = _update_table_input(table_input, columns_types) @@ -366,7 +367,7 @@ def _create_orc_table( _logger.debug("catalog_table_input: %s", catalog_table_input) table_input: dict[str, Any] - if (catalog_table_input is not None) and (mode in ("append", "overwrite_partitions")): + if (catalog_table_input is not None) and (mode in ("append", "overwrite_partitions", "overwrite_files")): table_input = catalog_table_input is_table_updated = _update_table_input(table_input, columns_types) @@ -436,7 +437,7 @@ def _create_csv_table( _utils.check_schema_changes(columns_types=columns_types, table_input=catalog_table_input, mode=mode) table_input: dict[str, Any] - if (catalog_table_input is not None) and (mode in ("append", "overwrite_partitions")): + if (catalog_table_input is not None) and (mode in ("append", "overwrite_partitions", "overwrite_files")): table_input = catalog_table_input is_table_updated = _update_table_input(table_input, columns_types, allow_reorder=False) @@ -508,7 +509,7 @@ def _create_json_table( table_input: dict[str, Any] if schema_evolution is False: _utils.check_schema_changes(columns_types=columns_types, table_input=catalog_table_input, mode=mode) - if (catalog_table_input is not None) and (mode in ("append", "overwrite_partitions")): + if (catalog_table_input is not None) and (mode in ("append", "overwrite_partitions", "overwrite_files")): table_input = catalog_table_input is_table_updated = _update_table_input(table_input, columns_types) @@ -1098,7 +1099,7 @@ def create_csv_table( If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it. schema_evolution If True allows schema evolution (new or missing columns), otherwise a exception will be raised. - (Only considered if dataset=True and mode in ("append", "overwrite_partitions")) + (Only considered if dataset=True and mode in ("append", "overwrite_partitions", "overwrite_files")) Related tutorial: https://aws-sdk-pandas.readthedocs.io/en/3.11.0/tutorials/014%20-%20Schema%20Evolution.html sep @@ -1278,7 +1279,7 @@ def create_json_table( If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it. schema_evolution If True allows schema evolution (new or missing columns), otherwise a exception will be raised. - (Only considered if dataset=True and mode in ("append", "overwrite_partitions")) + (Only considered if dataset=True and mode in ("append", "overwrite_partitions", "overwrite_files")) Related tutorial: https://aws-sdk-pandas.readthedocs.io/en/3.11.0/tutorials/014%20-%20Schema%20Evolution.html serde_library diff --git a/awswrangler/s3/_write.py b/awswrangler/s3/_write.py index e49d6da54..4669d5831 100644 --- a/awswrangler/s3/_write.py +++ b/awswrangler/s3/_write.py @@ -47,7 +47,7 @@ def _extract_dtypes_from_table_input(table_input: dict[str, Any]) -> dict[str, s def _apply_dtype( df: pd.DataFrame, dtype: dict[str, str], catalog_table_input: dict[str, Any] | None, mode: str ) -> pd.DataFrame: - if mode in ("append", "overwrite_partitions"): + if mode in ("append", "overwrite_partitions", "overwrite_files"): if catalog_table_input is not None: catalog_types: dict[str, str] | None = _extract_dtypes_from_table_input(table_input=catalog_table_input) if catalog_types is not None: diff --git a/awswrangler/s3/_write_orc.py b/awswrangler/s3/_write_orc.py index 7d1061874..c5f434a3a 100644 --- a/awswrangler/s3/_write_orc.py +++ b/awswrangler/s3/_write_orc.py @@ -414,7 +414,7 @@ def to_orc( If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it. schema_evolution If True allows schema evolution (new or missing columns), otherwise a exception will be raised. True by default. - (Only considered if dataset=True and mode in ("append", "overwrite_partitions")) + (Only considered if dataset=True and mode in ("append", "overwrite_partitions", "overwrite_files")) Related tutorial: https://aws-sdk-pandas.readthedocs.io/en/3.11.0/tutorials/014%20-%20Schema%20Evolution.html database diff --git a/awswrangler/s3/_write_parquet.py b/awswrangler/s3/_write_parquet.py index 835d2ebfd..9bef70999 100644 --- a/awswrangler/s3/_write_parquet.py +++ b/awswrangler/s3/_write_parquet.py @@ -444,7 +444,7 @@ def to_parquet( If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it. schema_evolution If True allows schema evolution (new or missing columns), otherwise a exception will be raised. True by default. - (Only considered if dataset=True and mode in ("append", "overwrite_partitions")) + (Only considered if dataset=True and mode in ("append", "overwrite_partitions", "overwrite_files")) Related tutorial: https://aws-sdk-pandas.readthedocs.io/en/3.11.0/tutorials/014%20-%20Schema%20Evolution.html database diff --git a/awswrangler/s3/_write_text.py b/awswrangler/s3/_write_text.py index eab400f60..827a440ec 100644 --- a/awswrangler/s3/_write_text.py +++ b/awswrangler/s3/_write_text.py @@ -179,7 +179,7 @@ def to_csv( # noqa: PLR0912,PLR0915 If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it. schema_evolution If True allows schema evolution (new or missing columns), otherwise a exception will be raised. - (Only considered if dataset=True and mode in ("append", "overwrite_partitions")). False by default. + (Only considered if dataset=True and mode in ("append", "overwrite_partitions", "overwrite_files")). False by default. Related tutorial: https://aws-sdk-pandas.readthedocs.io/en/3.11.0/tutorials/014%20-%20Schema%20Evolution.html database @@ -724,7 +724,7 @@ def to_json( # noqa: PLR0912,PLR0915 If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it. schema_evolution If True allows schema evolution (new or missing columns), otherwise a exception will be raised. - (Only considered if dataset=True and mode in ("append", "overwrite_partitions")) + (Only considered if dataset=True and mode in ("append", "overwrite_partitions", "overwrite_files")) Related tutorial: https://aws-sdk-pandas.readthedocs.io/en/3.11.0/tutorials/014%20-%20Schema%20Evolution.html database From 1b879bfa7ba7c3ebeab91130d62eb7859723d84a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Jose=CC=81=20Pereira=20Vieito?= <pvieito@gmail.com> Date: Fri, 17 Jan 2025 13:01:59 +0100 Subject: [PATCH 15/18] Added info comment --- awswrangler/s3/_write_dataset.py | 86 ++++++++++++++++---------------- 1 file changed, 44 insertions(+), 42 deletions(-) diff --git a/awswrangler/s3/_write_dataset.py b/awswrangler/s3/_write_dataset.py index a95f9229d..d5e4027ee 100644 --- a/awswrangler/s3/_write_dataset.py +++ b/awswrangler/s3/_write_dataset.py @@ -24,8 +24,10 @@ def _load_mode_and_filename_prefix(*, mode: str | None, filename_prefix: str | N mode = "append" if mode == "overwrite_files": + # In `overwrite_files` mode, we need create a deterministic + # filename to ensure that the same file is overwritten: if filename_prefix is None: - filename_prefix = "part" + filename_prefix = "data" random_filename_suffix = "" mode = "append" else: @@ -68,8 +70,8 @@ def _get_bucket_number(number_of_buckets: int, values: list[str | int | bool]) - def _get_value_hash(value: str | int | bool) -> int: if isinstance(value, (int, np.int_)): value = int(value) - bigint_min, bigint_max = -(2**63), 2**63 - 1 - int_min, int_max = -(2**31), 2**31 - 1 + bigint_min, bigint_max = -(2 ** 63), 2 ** 63 - 1 + int_min, int_max = -(2 ** 31), 2 ** 31 - 1 if not bigint_min <= value <= bigint_max: raise ValueError(f"{value} exceeds the range that Athena cannot handle as bigint.") if not int_min <= value <= int_max: @@ -97,13 +99,13 @@ def _get_subgroup_prefix(keys: tuple[str, None], partition_cols: list[str], path def _delete_objects( - keys: tuple[str, None], - path_root: str, - use_threads: bool | int, - mode: str, - partition_cols: list[str], - boto3_session: boto3.Session | None = None, - **func_kwargs: Any, + keys: tuple[str, None], + path_root: str, + use_threads: bool | int, + mode: str, + partition_cols: list[str], + boto3_session: boto3.Session | None = None, + **func_kwargs: Any, ) -> str: # Keys are either a primitive type or a tuple if partitioning by multiple cols keys = (keys,) if not isinstance(keys, tuple) else keys @@ -120,17 +122,17 @@ def _delete_objects( @engine.dispatch_on_engine def _to_partitions( - df: pd.DataFrame, - func: Callable[..., list[str]], - concurrent_partitioning: bool, - path_root: str, - use_threads: bool | int, - mode: str, - partition_cols: list[str], - bucketing_info: typing.BucketingInfoTuple | None, - filename_prefix: str, - boto3_session: boto3.Session | None, - **func_kwargs: Any, + df: pd.DataFrame, + func: Callable[..., list[str]], + concurrent_partitioning: bool, + path_root: str, + use_threads: bool | int, + mode: str, + partition_cols: list[str], + bucketing_info: typing.BucketingInfoTuple | None, + filename_prefix: str, + boto3_session: boto3.Session | None, + **func_kwargs: Any, ) -> tuple[list[str], dict[str, list[str]]]: partitions_values: dict[str, list[str]] = {} proxy: _WriteProxy = _WriteProxy(use_threads=concurrent_partitioning) @@ -187,15 +189,15 @@ def _to_partitions( @engine.dispatch_on_engine def _to_buckets( - df: pd.DataFrame, - func: Callable[..., list[str]], - path_root: str, - bucketing_info: typing.BucketingInfoTuple, - filename_prefix: str, - boto3_session: boto3.Session | None, - use_threads: bool | int, - proxy: _WriteProxy | None = None, - **func_kwargs: Any, + df: pd.DataFrame, + func: Callable[..., list[str]], + path_root: str, + bucketing_info: typing.BucketingInfoTuple, + filename_prefix: str, + boto3_session: boto3.Session | None, + use_threads: bool | int, + proxy: _WriteProxy | None = None, + **func_kwargs: Any, ) -> list[str]: _proxy: _WriteProxy = proxy if proxy else _WriteProxy(use_threads=False) s3_client = client(service_name="s3", session=boto3_session) @@ -216,18 +218,18 @@ def _to_buckets( def _to_dataset( - func: Callable[..., list[str]], - concurrent_partitioning: bool, - df: pd.DataFrame, - path_root: str, - filename_prefix: str | None, - index: bool, - use_threads: bool | int, - mode: str, - partition_cols: list[str] | None, - bucketing_info: typing.BucketingInfoTuple | None, - boto3_session: boto3.Session | None, - **func_kwargs: Any, + func: Callable[..., list[str]], + concurrent_partitioning: bool, + df: pd.DataFrame, + path_root: str, + filename_prefix: str | None, + index: bool, + use_threads: bool | int, + mode: str, + partition_cols: list[str] | None, + bucketing_info: typing.BucketingInfoTuple | None, + boto3_session: boto3.Session | None, + **func_kwargs: Any, ) -> tuple[list[str], dict[str, list[str]]]: path_root = path_root if path_root.endswith("/") else f"{path_root}/" # Evaluate mode From ce2e362546dfd51651bfbd81b9217eb88509c711 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Jose=CC=81=20Pereira=20Vieito?= <pvieito@gmail.com> Date: Fri, 17 Jan 2025 13:02:14 +0100 Subject: [PATCH 16/18] Added info comment --- awswrangler/s3/_write_dataset.py | 82 ++++++++++++++++---------------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/awswrangler/s3/_write_dataset.py b/awswrangler/s3/_write_dataset.py index d5e4027ee..74e4b7e8d 100644 --- a/awswrangler/s3/_write_dataset.py +++ b/awswrangler/s3/_write_dataset.py @@ -70,8 +70,8 @@ def _get_bucket_number(number_of_buckets: int, values: list[str | int | bool]) - def _get_value_hash(value: str | int | bool) -> int: if isinstance(value, (int, np.int_)): value = int(value) - bigint_min, bigint_max = -(2 ** 63), 2 ** 63 - 1 - int_min, int_max = -(2 ** 31), 2 ** 31 - 1 + bigint_min, bigint_max = -(2**63), 2**63 - 1 + int_min, int_max = -(2**31), 2**31 - 1 if not bigint_min <= value <= bigint_max: raise ValueError(f"{value} exceeds the range that Athena cannot handle as bigint.") if not int_min <= value <= int_max: @@ -99,13 +99,13 @@ def _get_subgroup_prefix(keys: tuple[str, None], partition_cols: list[str], path def _delete_objects( - keys: tuple[str, None], - path_root: str, - use_threads: bool | int, - mode: str, - partition_cols: list[str], - boto3_session: boto3.Session | None = None, - **func_kwargs: Any, + keys: tuple[str, None], + path_root: str, + use_threads: bool | int, + mode: str, + partition_cols: list[str], + boto3_session: boto3.Session | None = None, + **func_kwargs: Any, ) -> str: # Keys are either a primitive type or a tuple if partitioning by multiple cols keys = (keys,) if not isinstance(keys, tuple) else keys @@ -122,17 +122,17 @@ def _delete_objects( @engine.dispatch_on_engine def _to_partitions( - df: pd.DataFrame, - func: Callable[..., list[str]], - concurrent_partitioning: bool, - path_root: str, - use_threads: bool | int, - mode: str, - partition_cols: list[str], - bucketing_info: typing.BucketingInfoTuple | None, - filename_prefix: str, - boto3_session: boto3.Session | None, - **func_kwargs: Any, + df: pd.DataFrame, + func: Callable[..., list[str]], + concurrent_partitioning: bool, + path_root: str, + use_threads: bool | int, + mode: str, + partition_cols: list[str], + bucketing_info: typing.BucketingInfoTuple | None, + filename_prefix: str, + boto3_session: boto3.Session | None, + **func_kwargs: Any, ) -> tuple[list[str], dict[str, list[str]]]: partitions_values: dict[str, list[str]] = {} proxy: _WriteProxy = _WriteProxy(use_threads=concurrent_partitioning) @@ -189,15 +189,15 @@ def _to_partitions( @engine.dispatch_on_engine def _to_buckets( - df: pd.DataFrame, - func: Callable[..., list[str]], - path_root: str, - bucketing_info: typing.BucketingInfoTuple, - filename_prefix: str, - boto3_session: boto3.Session | None, - use_threads: bool | int, - proxy: _WriteProxy | None = None, - **func_kwargs: Any, + df: pd.DataFrame, + func: Callable[..., list[str]], + path_root: str, + bucketing_info: typing.BucketingInfoTuple, + filename_prefix: str, + boto3_session: boto3.Session | None, + use_threads: bool | int, + proxy: _WriteProxy | None = None, + **func_kwargs: Any, ) -> list[str]: _proxy: _WriteProxy = proxy if proxy else _WriteProxy(use_threads=False) s3_client = client(service_name="s3", session=boto3_session) @@ -218,18 +218,18 @@ def _to_buckets( def _to_dataset( - func: Callable[..., list[str]], - concurrent_partitioning: bool, - df: pd.DataFrame, - path_root: str, - filename_prefix: str | None, - index: bool, - use_threads: bool | int, - mode: str, - partition_cols: list[str] | None, - bucketing_info: typing.BucketingInfoTuple | None, - boto3_session: boto3.Session | None, - **func_kwargs: Any, + func: Callable[..., list[str]], + concurrent_partitioning: bool, + df: pd.DataFrame, + path_root: str, + filename_prefix: str | None, + index: bool, + use_threads: bool | int, + mode: str, + partition_cols: list[str] | None, + bucketing_info: typing.BucketingInfoTuple | None, + boto3_session: boto3.Session | None, + **func_kwargs: Any, ) -> tuple[list[str], dict[str, list[str]]]: path_root = path_root if path_root.endswith("/") else f"{path_root}/" # Evaluate mode From d5c6b78b79396758c5a558582ec83769ea332fae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Jose=CC=81=20Pereira=20Vieito?= <pvieito@gmail.com> Date: Fri, 17 Jan 2025 13:03:31 +0100 Subject: [PATCH 17/18] Added info comment --- awswrangler/s3/_write_dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/awswrangler/s3/_write_dataset.py b/awswrangler/s3/_write_dataset.py index 74e4b7e8d..8ac82f88b 100644 --- a/awswrangler/s3/_write_dataset.py +++ b/awswrangler/s3/_write_dataset.py @@ -24,8 +24,8 @@ def _load_mode_and_filename_prefix(*, mode: str | None, filename_prefix: str | N mode = "append" if mode == "overwrite_files": - # In `overwrite_files` mode, we need create a deterministic - # filename to ensure that the same file is overwritten: + # In `overwrite_files` mode, we need to create a deterministic + # filename to ensure that the same file is always overwritten: if filename_prefix is None: filename_prefix = "data" random_filename_suffix = "" From 18ac992636bea2649a2b5bda2ed4f3bed359f9c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Jose=CC=81=20Pereira=20Vieito?= <pvieito@gmail.com> Date: Fri, 17 Jan 2025 13:27:14 +0100 Subject: [PATCH 18/18] Added info comment --- awswrangler/s3/_write_dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/awswrangler/s3/_write_dataset.py b/awswrangler/s3/_write_dataset.py index 8ac82f88b..fc09922cc 100644 --- a/awswrangler/s3/_write_dataset.py +++ b/awswrangler/s3/_write_dataset.py @@ -24,8 +24,8 @@ def _load_mode_and_filename_prefix(*, mode: str | None, filename_prefix: str | N mode = "append" if mode == "overwrite_files": - # In `overwrite_files` mode, we need to create a deterministic - # filename to ensure that the same file is always overwritten: + # In `overwrite_files` mode, we need to create deterministic + # filenames to ensure that the same files are always overwritten: if filename_prefix is None: filename_prefix = "data" random_filename_suffix = ""