Skip to content

Commit 604c2a0

Browse files
committed
Drop columns for sql client impls
1 parent fef2064 commit 604c2a0

File tree

2 files changed

+28
-0
lines changed

2 files changed

+28
-0
lines changed

dlt/destinations/impl/athena/athena.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
Iterable,
1313
Type,
1414
cast,
15+
Union,
1516
)
1617
from copy import deepcopy
1718
import re
@@ -231,6 +232,20 @@ def drop_tables(self, *tables: str) -> None:
231232
]
232233
self.execute_many(statements)
233234

235+
def drop_columns(self, from_tables_drop_cols: List[Dict[str, Union[str, List[str]]]]) -> None:
236+
"""Drops specified columns from specified tables. The statement will fail if the columns don't exist."""
237+
238+
statements = []
239+
for from_table_drop_cols in from_tables_drop_cols:
240+
table = cast(str, from_table_drop_cols["from_table"])
241+
for column in from_table_drop_cols["drop_columns"]:
242+
statements.append(
243+
f"ALTER TABLE {self.make_qualified_ddl_table_name(table)} DROP COLUMN"
244+
f" {self.escape_column_name(column)};"
245+
)
246+
247+
self.execute_many(statements)
248+
234249
@contextmanager
235250
@raise_database_error
236251
def begin_transaction(self) -> Iterator[DBTransaction]:

dlt/destinations/impl/sqlalchemy/db_api_client.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,19 @@ def drop_tables(self, *tables: str) -> None:
312312
tbl = sa.Table(table, self.metadata, schema=self.dataset_name, keep_existing=True)
313313
self.execute_sql(sa.schema.DropTable(tbl, if_exists=True))
314314

315+
def drop_columns(self, from_tables_drop_cols: List[Dict[str, Union[str, List[str]]]]) -> None:
316+
for from_table_drop_cols in from_tables_drop_cols:
317+
table_name = from_table_drop_cols["from_table"]
318+
drop_columns = from_table_drop_cols["drop_columns"]
319+
320+
tbl = sa.Table(table_name, self.metadata, schema=self.dataset_name, keep_existing=True)
321+
existing_cols = {col.name for col in tbl.columns}
322+
323+
for column in drop_columns:
324+
if column in existing_cols:
325+
ddl = f"ALTER TABLE {self.make_qualified_table_name(table_name)} DROP COLUMN {self.escape_column_name(column)}" # type: ignore[arg-type]
326+
self.execute_sql(ddl)
327+
315328
def execute_sql(
316329
self, sql: Union[AnyStr, sa.sql.Executable], *args: Any, **kwargs: Any
317330
) -> Optional[Sequence[Sequence[Any]]]:

0 commit comments

Comments
 (0)