Skip to content

Commit 9f9b7ee

Browse files
committed
Drop column support for various destinations
1 parent 02d81b3 commit 9f9b7ee

File tree

11 files changed

+311
-15
lines changed

11 files changed

+311
-15
lines changed

dlt/common/libs/deltalake.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,40 @@ def write_delta_table(
113113
)
114114

115115

116+
def drop_columns_delta_table(
117+
table: DeltaTable,
118+
columns_to_drop: List[str],
119+
) -> None:
120+
"""Drops columns from a Delta table by rewriting it with the remaining columns.
121+
122+
This function reads the entire table, removes the specified columns, and rewrites
123+
the table with the remaining columns. This is a workaround for the limitation
124+
that delta-rs cannot natively drop columns without column mapping enabled.
125+
126+
Args:
127+
table: The DeltaTable to modify (should already have storage options configured)
128+
columns_to_drop: List of column names to drop
129+
"""
130+
arrow_table = table.to_pyarrow_table()
131+
132+
current_schema = arrow_table.schema
133+
remaining_columns = [col for col in current_schema.names if col not in columns_to_drop]
134+
135+
filtered_table = arrow_table.select(remaining_columns)
136+
137+
partition_columns = []
138+
metadata = table.metadata()
139+
partition_columns = [col for col in metadata.partition_columns if col in remaining_columns]
140+
141+
write_deltalake(
142+
table_or_uri=table,
143+
data=ensure_delta_compatible_arrow_data(filtered_table, partition_columns or None),
144+
partition_by=partition_columns or None,
145+
mode="overwrite",
146+
schema_mode="overwrite",
147+
)
148+
149+
116150
def merge_delta_table(
117151
table: DeltaTable,
118152
data: Union[pa.Table, pa.RecordBatchReader],

dlt/destinations/impl/athena/athena.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,28 @@ def should_load_data_to_staging_dataset_on_staging_destination(self, table_name:
406406
return True
407407
return super().should_load_data_to_staging_dataset_on_staging_destination(table_name)
408408

409+
def drop_columns(
410+
self,
411+
from_tables_drop_cols: List[Dict[str, Union[str, List[str]]]],
412+
update_schema: bool = True,
413+
) -> None:
414+
"""Drops specified columns from specified tables, using appropriate method based on table format"""
415+
for from_table_drop_cols in from_tables_drop_cols:
416+
table_name = cast(str, from_table_drop_cols["from_table"])
417+
columns_to_drop = cast(List[str], from_table_drop_cols["drop_columns"])
418+
419+
table_schema = self.prepare_load_table(table_name)
420+
421+
if self._is_iceberg_table(table_schema):
422+
# For Iceberg tables, use the base SQL client method (ALTER TABLE DROP COLUMN)
423+
self.sql_client.drop_columns([from_table_drop_cols])
424+
else:
425+
# For Hive tables, use the special REPLACE COLUMNS method
426+
self.sql_client.drop_columns_hive(table_name, columns_to_drop)
427+
428+
if update_schema:
429+
self._update_schema_in_storage(self.schema)
430+
409431
@staticmethod
410432
def is_dbapi_exception(ex: Exception) -> bool:
411433
from pyathena.error import Error

dlt/destinations/impl/athena/sql_client.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,40 @@ def drop_columns(self, from_tables_drop_cols: List[Dict[str, Union[str, List[str
158158

159159
self.execute_many(statements)
160160

161+
def drop_columns_hive(self, table_name: str, columns_to_drop: List[str]) -> None:
162+
"""Drop columns from Hive table using ALTER TABLE REPLACE COLUMNS"""
163+
qualified_table_name = self.make_qualified_ddl_table_name(table_name)
164+
165+
# Get current table schema
166+
describe_query = f"DESCRIBE {qualified_table_name}"
167+
result = self.execute_sql(describe_query)
168+
169+
current_columns = []
170+
if result:
171+
for row in result:
172+
# DESCRIBE returns: [('col_name \tdata_type \t. ',)
173+
parts = row[0].split()
174+
current_columns.append(
175+
{
176+
"name": parts[0],
177+
"type": parts[1],
178+
}
179+
)
180+
181+
remaining_columns = [col for col in current_columns if col["name"] not in columns_to_drop]
182+
183+
# Build column definitions for REPLACE COLUMNS
184+
column_definitions = []
185+
for col in remaining_columns:
186+
col_name = self.escape_ddl_identifier(col["name"])
187+
col_type = col["type"]
188+
column_definitions.append(f"{col_name} {col_type}")
189+
190+
replace_sql = (
191+
f"ALTER TABLE {qualified_table_name} REPLACE COLUMNS ({', '.join(column_definitions)})"
192+
)
193+
self.execute_sql(replace_sql)
194+
161195
@contextmanager
162196
@raise_database_error
163197
def begin_transaction(self) -> Iterator[DBTransaction]:

dlt/destinations/impl/databricks/sql_client.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
Iterator,
88
Optional,
99
Sequence,
10+
cast,
11+
List,
12+
Dict,
13+
Union,
1014
)
1115
from databricks import sql as databricks_lib
1216
from databricks.sql.client import (
@@ -106,6 +110,25 @@ def drop_tables(self, *tables: str) -> None:
106110
with suppress(DatabaseUndefinedRelation):
107111
super().drop_tables(*tables)
108112

113+
def drop_columns(self, from_tables_drop_cols: List[Dict[str, Union[str, List[str]]]]) -> None:
114+
"""Drops specified columns from specified tables if they exist"""
115+
116+
statements = []
117+
for from_table_drop_cols in from_tables_drop_cols:
118+
table = cast(str, from_table_drop_cols["from_table"])
119+
statements.append(
120+
f"ALTER TABLE {self.make_qualified_table_name(table)} SET TBLPROPERTIES"
121+
" ('delta.columnMapping.mode' = 'name', 'delta.minReaderVersion' = '2',"
122+
" 'delta.minWriterVersion' = '5')"
123+
)
124+
for column in from_table_drop_cols["drop_columns"]:
125+
statements.append(
126+
f"ALTER TABLE {self.make_qualified_table_name(table)} DROP COLUMN IF EXISTS"
127+
f" {self.escape_column_name(column)};"
128+
)
129+
130+
self.execute_many(statements)
131+
109132
def execute_sql(
110133
self, sql: AnyStr, *args: Any, **kwargs: Any
111134
) -> Optional[Sequence[Sequence[Any]]]:

dlt/destinations/impl/filesystem/filesystem.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
Literal,
1717
Any,
1818
Dict,
19+
Union,
1920
)
2021
from fsspec import AbstractFileSystem
2122

@@ -937,3 +938,72 @@ def is_open_table(self, table_format: TTableFormat, table_name: str) -> bool:
937938
return False
938939
detected_format = prepared_table.get("table_format")
939940
return table_format == detected_format
941+
942+
def drop_columns(
943+
self,
944+
from_tables_drop_cols: List[Dict[str, Union[str, List[str]]]],
945+
update_schema: bool = True,
946+
) -> None:
947+
for table_spec in from_tables_drop_cols:
948+
table_name = cast(str, table_spec["from_table"])
949+
columns_to_drop = list(table_spec["drop_columns"])
950+
951+
if self.is_open_table("iceberg", table_name):
952+
ice_table = self.load_open_table("iceberg", table_name)
953+
# Alter schema – delete columns one by one (requires incompatible changes flag)
954+
with ice_table.update_schema(allow_incompatible_changes=True) as update:
955+
for col in columns_to_drop:
956+
update.delete_column(col)
957+
958+
elif self.is_open_table("delta", table_name):
959+
from dlt.common.libs.deltalake import drop_columns_delta_table
960+
961+
delta_table = self.load_open_table("delta", table_name)
962+
# Drop columns by rewriting the table
963+
drop_columns_delta_table(
964+
table=delta_table,
965+
columns_to_drop=columns_to_drop,
966+
)
967+
else:
968+
# Handle regular filesystem tables (jsonl, parquet, csv)
969+
self._drop_columns_from_regular_table(table_name, columns_to_drop)
970+
971+
if update_schema:
972+
self._update_schema_in_storage(self.schema)
973+
974+
def _drop_columns_from_regular_table(self, table_name: str, columns_to_drop: List[str]) -> None:
975+
from dlt.common.libs.pyarrow import pyarrow as pa
976+
977+
table_files = self.list_table_files(table_name)
978+
979+
for file_path in table_files:
980+
file_ext = os.path.splitext(file_path)[1].lower()
981+
982+
if file_ext == ".parquet":
983+
table = pa.parquet.read_table(self.make_remote_url(file_path))
984+
columns_to_keep = [col for col in table.column_names if col not in columns_to_drop]
985+
filtered_table = table.select(columns_to_keep)
986+
with pa.parquet.ParquetWriter(
987+
self.make_remote_url(file_path), filtered_table.schema
988+
) as writer:
989+
writer.write_table(filtered_table)
990+
991+
elif file_ext == ".jsonl":
992+
content = self.fs_client.read_text(file_path, encoding="utf-8")
993+
lines = content.strip().split("\n")
994+
995+
filtered_lines = []
996+
for line in lines:
997+
if line.strip():
998+
record = json.loads(line)
999+
for col in columns_to_drop:
1000+
record.pop(col, None)
1001+
filtered_lines.append(json.dumps(record))
1002+
1003+
self.fs_client.write_text(file_path, "\n".join(filtered_lines), encoding="utf-8")
1004+
1005+
elif file_ext == ".csv":
1006+
table = pa.csv.read_csv(self.make_remote_url(file_path))
1007+
columns_to_keep = [col for col in table.column_names if col not in columns_to_drop]
1008+
filtered_table = table.select(columns_to_keep)
1009+
pa.csv.write_csv(filtered_table, self.make_remote_url(file_path))

dlt/destinations/impl/redshift/redshift.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
# from psycopg2.sql import SQL, Composed
1515

16-
from typing import Dict, List, Optional, Sequence
16+
from typing import Dict, List, Optional, Sequence, cast, Union
1717

1818
from dlt.common.destination.client import (
1919
FollowupJobRequest,
@@ -59,6 +59,20 @@ def _maybe_make_terminal_exception_from_data_error(
5959
return DatabaseTerminalException(pg_ex)
6060
return None
6161

62+
def drop_columns(self, from_tables_drop_cols: List[Dict[str, Union[str, List[str]]]]) -> None:
63+
"""Drops specified columns from specified tables if they exist"""
64+
65+
statements = []
66+
for from_table_drop_cols in from_tables_drop_cols:
67+
table = cast(str, from_table_drop_cols["from_table"])
68+
for column in from_table_drop_cols["drop_columns"]:
69+
statements.append(
70+
f"ALTER TABLE {self.make_qualified_table_name(table)} DROP COLUMN"
71+
f" {self.escape_column_name(column)};"
72+
)
73+
74+
self.execute_many(statements)
75+
6276

6377
class RedshiftCopyFileLoadJob(CopyRemoteFileLoadJob):
6478
def __init__(

dlt/destinations/impl/sqlalchemy/db_api_client.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,16 @@
1-
from typing import Optional, Iterator, Any, Sequence, AnyStr, Union, Tuple, List, Dict, Set, cast
1+
from typing import (
2+
Optional,
3+
Iterator,
4+
Any,
5+
Sequence,
6+
AnyStr,
7+
Union,
8+
Tuple,
9+
List,
10+
Dict,
11+
Set,
12+
cast,
13+
)
214
from contextlib import contextmanager
315
from functools import wraps
416
import inspect
@@ -299,15 +311,19 @@ def drop_tables(self, *tables: str) -> None:
299311

300312
def drop_columns(self, from_tables_drop_cols: List[Dict[str, Union[str, List[str]]]]) -> None:
301313
for from_table_drop_cols in from_tables_drop_cols:
302-
table_name = from_table_drop_cols["from_table"]
314+
table_name = cast(str, from_table_drop_cols["from_table"])
303315
drop_columns = from_table_drop_cols["drop_columns"]
304316

305-
tbl = sa.Table(table_name, self.metadata, schema=self.dataset_name, keep_existing=True)
317+
# Reflect current table definition to fetch existing columns
318+
tbl = self.reflect_table(table_name)
306319
existing_cols = {col.name for col in tbl.columns}
307320

308321
for column in drop_columns:
309322
if column in existing_cols:
310-
ddl = f"ALTER TABLE {self.make_qualified_table_name(table_name)} DROP COLUMN {self.escape_column_name(column)}" # type: ignore[arg-type]
323+
ddl = (
324+
f"ALTER TABLE {self.make_qualified_table_name(table_name)} "
325+
f"DROP COLUMN {self.escape_column_name(column)}"
326+
)
311327
self.execute_sql(ddl)
312328

313329
def execute_sql(

dlt/destinations/impl/synapse/sql_client.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from typing import List, Dict, Union, cast
12
from contextlib import suppress
23

34
from dlt.destinations.impl.mssql.sql_client import PyOdbcMsSqlClient
@@ -14,3 +15,16 @@ def drop_tables(self, *tables: str) -> None:
1415
for statement in statements:
1516
with suppress(DatabaseUndefinedRelation):
1617
self.execute_sql(statement)
18+
19+
def drop_columns(self, from_tables_drop_cols: List[Dict[str, Union[str, List[str]]]]) -> None:
20+
"""Drops specified columns from specified tables if they exist"""
21+
statements = []
22+
for from_table_drop_cols in from_tables_drop_cols:
23+
table = cast(str, from_table_drop_cols["from_table"])
24+
for column in from_table_drop_cols["drop_columns"]:
25+
statements.append(
26+
f"ALTER TABLE {self.make_qualified_table_name(table)} DROP COLUMN"
27+
f" {self.escape_column_name(column)};"
28+
)
29+
30+
self.execute_many(statements)

dlt/destinations/job_client_impl.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ def drop_tables(self, *tables: str, delete_schema: bool = True) -> None:
333333
def drop_columns(
334334
self,
335335
from_tables_drop_cols: List[Dict[str, Union[str, List[str]]]],
336-
delete_schema: bool = True,
336+
update_schema: bool = True,
337337
) -> None:
338338
"""Drop columns in destination database and optionally delete the stored schema as well.
339339
Clients that support ddl transactions will have both operations performed in a single transaction.
@@ -345,8 +345,8 @@ def drop_columns(
345345
"""
346346
with self.maybe_ddl_transaction():
347347
self.sql_client.drop_columns(from_tables_drop_cols)
348-
if delete_schema:
349-
self._delete_schema_in_storage(self.schema)
348+
if update_schema:
349+
self._update_schema_in_storage(self.schema)
350350

351351
@contextlib.contextmanager
352352
def maybe_ddl_transaction(self) -> Iterator[None]:

dlt/load/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ def _init_dataset_and_update_schema(
185185
f"Client for {job_client.config.destination_type} will drop columns"
186186
f" from tables {table_names} {staging_text}"
187187
)
188-
job_client.drop_columns(from_tables_drop_cols, delete_schema=True)
188+
job_client.drop_columns(from_tables_drop_cols, update_schema=True)
189189
else:
190190
logger.warning(
191191
f"Client for {job_client.config.destination_type} does not implement drop columns."

0 commit comments

Comments
 (0)