Skip to content

QoL: Drop column #2754

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions dlt/cli/pipeline_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,56 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:
fmt.warning(warning)
if fmt.confirm("Do you want to apply these changes?", default=False):
drop()

if operation == "drop-columns":
drop = DropCommand(p, **command_kwargs)
if drop.is_empty:
fmt.echo(
"Could not select any columns to drop. Use"
" the command below to inspect the pipeline:"
)
fmt.echo(f"dlt pipeline -v {p.pipeline_name} info")
if len(drop.info["warnings"]):
fmt.echo("Additional warnings are available")
for warning in drop.info["warnings"]:
fmt.warning(warning)
if len(drop.info["notes"]):
fmt.echo("Additional notes are available")
for note in drop.info["notes"]:
fmt.echo(fmt.style(note, fg="yellow"))
return

fmt.echo(
"About to drop the following columns in dataset %s in destination %s:"
% (
fmt.bold(p.dataset_name),
fmt.bold(p.destination.destination_name),
)
)
fmt.echo("%s: %s" % (fmt.style("Selected schema", fg="green"), drop.info["schema_name"]))
fmt.echo(
"%s: %s"
% (
fmt.style("Selected resource(s)", fg="green"),
drop.info["resource_names"],
)
)
fmt.echo("%s: %s" % (fmt.style("Table(s) to be affected", fg="green"), drop.info["tables"]))
if drop.from_tables_drop_cols:
fmt.echo(f"{fmt.style('Column(s) to be dropped', fg='green')}:")
for from_table_drop_cols in drop.from_tables_drop_cols:
table_name = from_table_drop_cols["from_table"]
columns = from_table_drop_cols["drop_columns"]
fmt.echo(f"\t{fmt.style('from table:', fg='green')} {table_name}")
fmt.echo(f"\t\t{fmt.style('columns:', fg='green')} {columns}")

if len(drop.info["warnings"]):
fmt.echo("Additional warnings are available")
for warning in drop.info["warnings"]:
fmt.warning(warning)
if len(drop.info["notes"]):
fmt.echo("Additional info is available:")
for note in drop.info["notes"]:
fmt.echo(fmt.style(note, fg="yellow"))
if fmt.confirm("Do you want to apply these changes?", default=False):
drop()
84 changes: 78 additions & 6 deletions dlt/cli/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,11 @@ def configure_parser(self, pipe_cmd: argparse.ArgumentParser) -> None:

```text
About to drop the following data in dataset airflow_events_1 in destination dlt.destinations.duckdb:
Selected schema:: github_repo_events
Selected resource(s):: ['repo_events']
Table(s) to drop:: ['issues_event', 'fork_event', 'pull_request_event', 'pull_request_review_event', 'pull_request_review_comment_event', 'watch_event', 'issue_comment_event', 'push_event__payload__commits', 'push_event']
Resource(s) state to reset:: ['repo_events']
Source state path(s) to reset:: []
Selected schema: github_repo_events
Selected resource(s): ['repo_events']
Table(s) to drop: ['issues_event', 'fork_event', 'pull_request_event', 'pull_request_review_event', 'pull_request_review_comment_event', 'watch_event', 'issue_comment_event', 'push_event__payload__commits', 'push_event']
Resource(s) state to reset: ['repo_events']
Source state path(s) to reset: []
Do you want to apply these changes? [y/N]
```

Expand Down Expand Up @@ -400,7 +400,7 @@ def configure_parser(self, pipe_cmd: argparse.ArgumentParser) -> None:
help="Drop all resources found in schema. Supersedes [resources] argument.",
)
pipe_cmd_drop.add_argument(
"--state-paths", nargs="*", help="State keys or json paths to drop", default=()
"--state-paths", nargs="*", help="State keys or json paths to drop.", default=()
)
pipe_cmd_drop.add_argument(
"--schema",
Expand Down Expand Up @@ -432,6 +432,78 @@ def configure_parser(self, pipe_cmd: argparse.ArgumentParser) -> None:
help="Load id of completed or normalized package. Defaults to the most recent package.",
)

pipe_cmd_drop_columns = pipeline_subparsers.add_parser(
"drop-columns",
help="Selectively drop columns from specified tables",
description="""
Selectively drop columns from specified resources and tables.

```sh
dlt pipeline <pipeline name> drop-columns --from-resources [resource_1] [resource_2] --from-tables [table_1] [table_2] --columns [column_1] [column_2]
```

**How column selection works:**

1. **Resource resolution**: If `--from-resources` is specified, tables are grouped by resource using regex pattern matching. If omitted, all resources are considered.

2. **Table resolution**: If `--from-tables` is specified, only tables matching the pattern(s) within the selected resources are considered. If omitted, all tables from selected resources are considered.

3. **Column resolution**: Columns are matched against the specified pattern(s) within the selected tables. Only nullable columns without disqualifying hints can be dropped.

**Column safety rules:**

Only columns that meet ALL of the following criteria can be dropped:
- The column is nullable (can contain NULL values)
- The column does not have any disqualifying hints such as: `partition`, `cluster`, `unique`, `sort`, `primary_key`, `row_key`, `parent_key`, `root_key`, `merge_key`, `variant`, `hard_delete`, `dedup_sort`, or `incremental`
- After dropping the matched columns, at least one non-dlt internal column must remain in the table

**Filesystem destination note:**

For filesystem destination, column dropping is only supported for tables that have an associated `table_format` (e.g., Iceberg, Delta). Tables without a table format will be skipped with a notification.

**Example Output:**

```text
About to drop the following columns in dataset my_dataset in destination dlt.destinations.duckdb:
Selected schema: droppable_source
Selected resource(s): ['droppable_b']
Table(s) to be affected: ['droppable_b__items']
Column(s) to be dropped:
from table: droppable_b__items
columns: ['m']
Do you want to apply these changes?
```

**Pattern matching:**

You can use regexes to select resources, tables and columns. Prepend the `re:` string to indicate a regex pattern. For example:
- `--from-resources "re:^user"` matches all resources starting with "user"
- `--columns "re:.*_temp$"` matches all columns ending with "_temp"
""",
epilog=(
f"See {DLT_PIPELINE_COMMAND_DOCS_URL}#selectively-drop-tables-and-reset-state for"
" more info"
),
)
pipe_cmd_drop_columns.add_argument(
"--from-resources",
dest="from_resources",
nargs="*",
help="Resource names to drop columns from.",
)
pipe_cmd_drop_columns.add_argument(
"--from-tables",
dest="from_tables",
nargs="*",
help="Table names to drop columns from.",
)
pipe_cmd_drop_columns.add_argument(
"--columns",
nargs="*",
help="Column names to drop from the specified resources.",
default=(),
)

def execute(self, args: argparse.Namespace) -> None:
if args.list_pipelines:
pipeline_command_wrapper("list", "-", args.pipelines_dir, args.verbosity)
Expand Down
34 changes: 34 additions & 0 deletions dlt/common/libs/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,40 @@ def write_delta_table(
)


def drop_columns_delta_table(
table: DeltaTable,
columns_to_drop: List[str],
) -> None:
"""Drops columns from a Delta table by rewriting it with the remaining columns.

This function reads the entire table, removes the specified columns, and rewrites
the table with the remaining columns. This is a workaround for the limitation
that delta-rs cannot natively drop columns without column mapping enabled.

Args:
table: The DeltaTable to modify (should already have storage options configured)
columns_to_drop: List of column names to drop
"""
arrow_table = table.to_pyarrow_table()

current_schema = arrow_table.schema
remaining_columns = [col for col in current_schema.names if col not in columns_to_drop]

filtered_table = arrow_table.select(remaining_columns)

partition_columns = []
metadata = table.metadata()
partition_columns = [col for col in metadata.partition_columns if col in remaining_columns]

write_deltalake(
table_or_uri=table,
data=ensure_delta_compatible_arrow_data(filtered_table, partition_columns or None),
partition_by=partition_columns or None,
mode="overwrite",
schema_mode="overwrite",
)


def merge_delta_table(
table: DeltaTable,
data: Union[pa.Table, pa.RecordBatchReader],
Expand Down
2 changes: 2 additions & 0 deletions dlt/common/storages/load_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class TPipelineStateDoc(TypedDict, total=False):


class TLoadPackageDropTablesState(TypedDict):
from_tables_drop_columns: NotRequired[List[Any]]
"""List of tables and columns that are to be dropped from them"""
dropped_tables: NotRequired[List[TTableSchema]]
"""List of tables that are to be dropped from the schema and destination (i.e. when `refresh` mode is used)"""
truncated_tables: NotRequired[List[TTableSchema]]
Expand Down
22 changes: 22 additions & 0 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,28 @@ def should_load_data_to_staging_dataset_on_staging_destination(self, table_name:
return True
return super().should_load_data_to_staging_dataset_on_staging_destination(table_name)

def drop_columns(
self,
from_tables_drop_cols: List[Dict[str, Union[str, List[str]]]],
update_schema: bool = True,
) -> None:
"""Drops specified columns from specified tables, using appropriate method based on table format"""
for from_table_drop_cols in from_tables_drop_cols:
table_name = cast(str, from_table_drop_cols["from_table"])
columns_to_drop = cast(List[str], from_table_drop_cols["drop_columns"])

table_schema = self.prepare_load_table(table_name)

if self._is_iceberg_table(table_schema):
# For Iceberg tables, use the base SQL client method (ALTER TABLE DROP COLUMN)
self.sql_client.drop_columns([from_table_drop_cols])
else:
# For Hive tables, use the special REPLACE COLUMNS method
self.sql_client.drop_columns_hive(table_name, columns_to_drop)

if update_schema:
self._update_schema_in_storage(self.schema)

@staticmethod
def is_dbapi_exception(ex: Exception) -> bool:
from pyathena.error import Error
Expand Down
50 changes: 50 additions & 0 deletions dlt/destinations/impl/athena/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
Dict,
Callable,
Type,
List,
Union,
cast,
)
from copy import deepcopy
import re
Expand Down Expand Up @@ -142,6 +145,53 @@ def drop_tables(self, *tables: str) -> None:
]
self.execute_many(statements)

def drop_columns(self, from_tables_drop_cols: List[Dict[str, Union[str, List[str]]]]) -> None:
"""Drops specified columns from specified tables if they exist"""
statements = []
for from_table_drop_cols in from_tables_drop_cols:
table = cast(str, from_table_drop_cols["from_table"])
for column in from_table_drop_cols["drop_columns"]:
statements.append(
f"ALTER TABLE {self.make_qualified_ddl_table_name(table)} DROP COLUMN"
f" {self.escape_ddl_identifier(column)};"
)

self.execute_many(statements)

def drop_columns_hive(self, table_name: str, columns_to_drop: List[str]) -> None:
"""Drop columns from Hive table using ALTER TABLE REPLACE COLUMNS"""
qualified_table_name = self.make_qualified_ddl_table_name(table_name)

# Get current table schema
describe_query = f"DESCRIBE {qualified_table_name}"
result = self.execute_sql(describe_query)

current_columns = []
if result:
for row in result:
# DESCRIBE returns: [('col_name \tdata_type \t. ',)
parts = row[0].split()
current_columns.append(
{
"name": parts[0],
"type": parts[1],
}
)

remaining_columns = [col for col in current_columns if col["name"] not in columns_to_drop]

# Build column definitions for REPLACE COLUMNS
column_definitions = []
for col in remaining_columns:
col_name = self.escape_ddl_identifier(col["name"])
col_type = col["type"]
column_definitions.append(f"{col_name} {col_type}")

replace_sql = (
f"ALTER TABLE {qualified_table_name} REPLACE COLUMNS ({', '.join(column_definitions)})"
)
self.execute_sql(replace_sql)

@contextmanager
@raise_database_error
def begin_transaction(self) -> Iterator[DBTransaction]:
Expand Down
23 changes: 23 additions & 0 deletions dlt/destinations/impl/databricks/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
Iterator,
Optional,
Sequence,
cast,
List,
Dict,
Union,
)
from databricks import sql as databricks_lib
from databricks.sql.client import (
Expand Down Expand Up @@ -106,6 +110,25 @@ def drop_tables(self, *tables: str) -> None:
with suppress(DatabaseUndefinedRelation):
super().drop_tables(*tables)

def drop_columns(self, from_tables_drop_cols: List[Dict[str, Union[str, List[str]]]]) -> None:
"""Drops specified columns from specified tables if they exist"""

statements = []
for from_table_drop_cols in from_tables_drop_cols:
table = cast(str, from_table_drop_cols["from_table"])
statements.append(
f"ALTER TABLE {self.make_qualified_table_name(table)} SET TBLPROPERTIES"
" ('delta.columnMapping.mode' = 'name', 'delta.minReaderVersion' = '2',"
" 'delta.minWriterVersion' = '5')"
)
for column in from_table_drop_cols["drop_columns"]:
statements.append(
f"ALTER TABLE {self.make_qualified_table_name(table)} DROP COLUMN IF EXISTS"
f" {self.escape_column_name(column)};"
)

self.execute_many(statements)

def execute_sql(
self, sql: AnyStr, *args: Any, **kwargs: Any
) -> Optional[Sequence[Sequence[Any]]]:
Expand Down
30 changes: 30 additions & 0 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
Literal,
Any,
Dict,
Union,
)
from fsspec import AbstractFileSystem

Expand Down Expand Up @@ -937,3 +938,32 @@ def is_open_table(self, table_format: TTableFormat, table_name: str) -> bool:
return False
detected_format = prepared_table.get("table_format")
return table_format == detected_format

def drop_columns(
self,
from_tables_drop_cols: List[Dict[str, Union[str, List[str]]]],
update_schema: bool = True,
) -> None:
for table_spec in from_tables_drop_cols:
table_name = cast(str, table_spec["from_table"])
columns_to_drop = list(table_spec["drop_columns"])

if self.is_open_table("iceberg", table_name):
ice_table = self.load_open_table("iceberg", table_name)
# Alter schema – delete columns one by one (requires incompatible changes flag)
with ice_table.update_schema(allow_incompatible_changes=True) as update:
for col in columns_to_drop:
update.delete_column(col)

elif self.is_open_table("delta", table_name):
from dlt.common.libs.deltalake import drop_columns_delta_table

delta_table = self.load_open_table("delta", table_name)
# Drop columns by rewriting the table
drop_columns_delta_table(
table=delta_table,
columns_to_drop=columns_to_drop,
)

if update_schema:
self._update_schema_in_storage(self.schema)
Loading
Loading