Skip to content

Commit 161c379

Browse files
committed
Initial commit with --from --columns enabled
1 parent bd4e17b commit 161c379

File tree

9 files changed

+279
-36
lines changed

9 files changed

+279
-36
lines changed

dlt/cli/pipeline_command.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,3 +393,46 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:
393393
fmt.warning(warning)
394394
if fmt.confirm("Do you want to apply these changes?", default=False):
395395
drop()
396+
397+
if operation == "drop-columns":
398+
drop = DropCommand(p, **command_kwargs)
399+
if drop.is_empty:
400+
fmt.echo(
401+
"Could not select any columns to drop. Use"
402+
" the command below to inspect the pipeline:"
403+
)
404+
fmt.echo(f"dlt pipeline -v {p.pipeline_name} info")
405+
if len(drop.info["warnings"]):
406+
fmt.echo("Additional warnings are available")
407+
for warning in drop.info["warnings"]:
408+
fmt.warning(warning)
409+
return
410+
411+
fmt.echo(
412+
"About to drop the following columns in dataset %s in destination %s:"
413+
% (
414+
fmt.bold(p.dataset_name),
415+
fmt.bold(p.destination.destination_name),
416+
)
417+
)
418+
fmt.echo("%s: %s" % (fmt.style("Selected schema", fg="green"), drop.info["schema_name"]))
419+
fmt.echo(
420+
"%s: %s"
421+
% (
422+
fmt.style("Selected resource(s)", fg="green"),
423+
drop.info["resource_names"],
424+
)
425+
)
426+
fmt.echo("%s: %s" % (fmt.style("Table(s) to be affected", fg="green"), drop.info["tables"]))
427+
if drop.from_tables_drop_cols:
428+
fmt.echo(f"{fmt.style('Column(s) to be dropped', fg='green')}:")
429+
for from_table_drop_cols in drop.from_tables_drop_cols:
430+
table_name = from_table_drop_cols["from_table"]
431+
columns = from_table_drop_cols["drop_columns"]
432+
fmt.echo(f"\t{fmt.style('from table:', fg='green')} {table_name}")
433+
fmt.echo(f"\t\t{fmt.style('columns:', fg='green')} {columns}")
434+
435+
for warning in drop.info["warnings"]:
436+
fmt.warning(warning)
437+
if fmt.confirm("Do you want to apply these changes?", default=False):
438+
drop()

dlt/cli/plugins.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ def configure_parser(self, pipe_cmd: argparse.ArgumentParser) -> None:
390390
help="Drop all resources found in schema. Supersedes [resources] argument.",
391391
)
392392
pipe_cmd_drop.add_argument(
393-
"--state-paths", nargs="*", help="State keys or json paths to drop", default=()
393+
"--state-paths", nargs="*", help="State keys or json paths to drop.", default=()
394394
)
395395
pipe_cmd_drop.add_argument(
396396
"--schema",
@@ -422,6 +422,28 @@ def configure_parser(self, pipe_cmd: argparse.ArgumentParser) -> None:
422422
help="Load id of completed or normalized package. Defaults to the most recent package.",
423423
)
424424

425+
pipe_cmd_drop_columns = pipeline_subparsers.add_parser(
426+
"drop-columns",
427+
help="Selectively drop columns from specified tables",
428+
description="""TODO""",
429+
epilog=(
430+
f"See {DLT_PIPELINE_COMMAND_DOCS_URL}#selectively-drop-tables-and-reset-state for"
431+
" more info"
432+
),
433+
)
434+
pipe_cmd_drop_columns.add_argument(
435+
"--from",
436+
dest="from_resources",
437+
nargs="*",
438+
help="(With --columns) Resource names to drop columns from.",
439+
)
440+
pipe_cmd_drop_columns.add_argument(
441+
"--columns",
442+
nargs="*",
443+
help="(With --from) Column names to drop from the specified resources.",
444+
default=(),
445+
)
446+
425447
def execute(self, args: argparse.Namespace) -> None:
426448
if args.list_pipelines:
427449
pipeline_command_wrapper("list", "-", args.pipelines_dir, args.verbosity)

dlt/common/storages/load_package.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ class TPipelineStateDoc(TypedDict, total=False):
7171

7272

7373
class TLoadPackageDropTablesState(TypedDict):
74+
from_tables_drop_columns: NotRequired[List[Any]]
75+
"""List of tables and columns that are to be dropped from them"""
7476
dropped_tables: NotRequired[List[TTableSchema]]
7577
"""List of tables that are to be dropped from the schema and destination (i.e. when `refresh` mode is used)"""
7678
truncated_tables: NotRequired[List[TTableSchema]]

dlt/destinations/job_client_impl.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
Type,
1616
Iterable,
1717
Iterator,
18+
Union,
1819
)
1920
import zlib
2021
import re
@@ -329,6 +330,24 @@ def drop_tables(self, *tables: str, delete_schema: bool = True) -> None:
329330
if delete_schema:
330331
self._delete_schema_in_storage(self.schema)
331332

333+
def drop_columns(
334+
self,
335+
from_tables_drop_cols: List[Dict[str, Union[str, List[str]]]],
336+
delete_schema: bool = True,
337+
) -> None:
338+
"""Drop columns in destination database and optionally delete the stored schema as well.
339+
Clients that support ddl transactions will have both operations performed in a single transaction.
340+
341+
Args:
342+
from_tables: Names of tables from which columns are to be dropped.
343+
from_tables_drop_cols: Names of columns to be dropped grouped by table.
344+
delete_schema: If True, also delete all versions of the current schema from storage
345+
"""
346+
with self.maybe_ddl_transaction():
347+
self.sql_client.drop_columns(from_tables_drop_cols)
348+
if delete_schema:
349+
self._delete_schema_in_storage(self.schema)
350+
332351
@contextlib.contextmanager
333352
def maybe_ddl_transaction(self) -> Iterator[None]:
334353
"""Begins a transaction if sql client supports it, otherwise works in auto commit."""

dlt/destinations/sql_client.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818
List,
1919
Generator,
2020
cast,
21+
Union,
2122
)
2223

2324
from dlt.common.typing import TFun, TypedDict, Self
24-
from dlt.common.schema.typing import TTableSchemaColumns
25+
from dlt.common.schema.typing import TTableSchemaColumns, TColumnSchema
2526
from dlt.common.destination import DestinationCapabilitiesContext
2627
from dlt.common.utils import concat_strings_with_limit
2728
from dlt.common.destination.client import JobClientBase
@@ -141,6 +142,20 @@ def drop_tables(self, *tables: str) -> None:
141142
]
142143
self.execute_many(statements)
143144

145+
def drop_columns(self, from_tables_drop_cols: List[Dict[str, Union[str, List[str]]]]) -> None:
146+
"""Drops specified columns from specified tables if they exist"""
147+
148+
statements = []
149+
for from_table_drop_cols in from_tables_drop_cols:
150+
table = cast(str, from_table_drop_cols["from_table"])
151+
for column in from_table_drop_cols["drop_columns"]:
152+
statements.append(
153+
f"ALTER TABLE {self.make_qualified_table_name(table)} DROP COLUMN IF EXISTS"
154+
f" {self.escape_column_name(column)};"
155+
)
156+
157+
self.execute_many(statements)
158+
144159
def _to_named_paramstyle(self, query: str, args: Sequence[Any]) -> Tuple[str, Dict[str, Any]]:
145160
"""Convert a query from "format" ( %s ) paramstyle to "named" ( :param_name ) paramstyle.
146161
The %s are replaced with :arg0, :arg1, ... and the arguments are returned as a dictionary.

dlt/load/load.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import contextlib
22
from functools import reduce
3-
from typing import Dict, List, Optional, Tuple, Iterator, Sequence
3+
from typing import Dict, List, Optional, Tuple, Iterator, Sequence, Union
44
from concurrent.futures import Executor
55
import os
66

@@ -517,6 +517,7 @@ def load_single_package(self, load_id: str, schema: Schema) -> None:
517517
# and they must be like that in order to drop existing tables
518518
dropped_tables = current_load_package()["state"].get("dropped_tables", [])
519519
truncated_tables = current_load_package()["state"].get("truncated_tables", [])
520+
from_tables_drop_cols = current_load_package()["state"].get("from_tables_drop_columns", [])
520521

521522
self.init_jobs_counter(load_id)
522523

@@ -537,6 +538,7 @@ def load_single_package(self, load_id: str, schema: Schema) -> None:
537538
),
538539
drop_tables=dropped_tables,
539540
truncate_tables=truncated_tables,
541+
from_tables_drop_cols=from_tables_drop_cols,
540542
)
541543

542544
# init staging client

dlt/load/utils.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import List, Set, Iterable, Callable, Optional, Tuple, Sequence
1+
from typing import List, Set, Iterable, Callable, Optional, Tuple, Sequence, Dict, Union, Any
22
from itertools import groupby
33

44
from dlt.common import logger
@@ -11,7 +11,7 @@
1111
)
1212
from dlt.common.storages.load_storage import ParsedLoadJobFileName
1313
from dlt.common.schema import Schema, TSchemaTables
14-
from dlt.common.schema.typing import TTableSchema
14+
from dlt.common.schema.typing import TTableSchema, TColumnSchema
1515
from dlt.common.destination.client import JobClientBase, WithStagingDataset, LoadJob
1616
from dlt.load.configuration import LoaderConfiguration
1717
from dlt.common.destination import DestinationCapabilitiesContext
@@ -71,6 +71,7 @@ def init_client(
7171
load_staging_filter: Callable[[str], bool],
7272
drop_tables: Optional[List[TTableSchema]] = None,
7373
truncate_tables: Optional[List[TTableSchema]] = None,
74+
from_tables_drop_cols: Optional[List[Dict[str, Union[str, List[str]]]]] = None,
7475
) -> TSchemaTables:
7576
"""Initializes destination storage including staging dataset if supported
7677
@@ -85,6 +86,7 @@ def init_client(
8586
load_staging_filter (Callable[[str], bool]): A filter which tell which table in the staging dataset may be loaded into
8687
drop_tables (Optional[List[TTableSchema]]): List of tables to drop before initializing storage
8788
truncate_tables (Optional[List[TTableSchema]]): List of tables to truncate before initializing storage
89+
from_tables_drop_cols Optional[List[Dict[str, Union[str, List[str]]]]]: List of columns to drop grouped by table.
8890
8991
Returns:
9092
TSchemaTables: Actual migrations done at destination
@@ -113,13 +115,15 @@ def init_client(
113115

114116
# get tables to drop
115117
drop_table_names = {table["name"] for table in drop_tables} if drop_tables else set()
118+
116119
job_client.verify_schema(only_tables=tables_with_jobs | dlt_tables, new_jobs=new_jobs)
117120
applied_update = _init_dataset_and_update_schema(
118121
job_client,
119122
expected_update,
120123
tables_with_jobs | dlt_tables,
121124
truncate_table_names,
122125
drop_tables=drop_table_names,
126+
from_tables_drop_cols=from_tables_drop_cols,
123127
)
124128

125129
# update the staging dataset if client supports this
@@ -153,6 +157,7 @@ def _init_dataset_and_update_schema(
153157
truncate_tables: Iterable[str] = None,
154158
staging_info: bool = False,
155159
drop_tables: Iterable[str] = None,
160+
from_tables_drop_cols: List[Dict[str, Union[str, List[str]]]] = None,
156161
) -> TSchemaTables:
157162
staging_text = "for staging dataset" if staging_info else ""
158163
logger.info(
@@ -171,6 +176,22 @@ def _init_dataset_and_update_schema(
171176
f"Client for {job_client.config.destination_type} does not implement drop table."
172177
f" Following tables {drop_tables} will not be dropped {staging_text}"
173178
)
179+
if from_tables_drop_cols and job_client.is_storage_initialized():
180+
table_names = [
181+
from_table_drop_cols["from_table"] for from_table_drop_cols in from_tables_drop_cols
182+
]
183+
if hasattr(job_client, "drop_columns"):
184+
logger.info(
185+
f"Client for {job_client.config.destination_type} will drop columns"
186+
f" from tables {table_names} {staging_text}"
187+
)
188+
job_client.drop_columns(from_tables_drop_cols, delete_schema=True)
189+
else:
190+
logger.warning(
191+
f"Client for {job_client.config.destination_type} does not implement drop columns."
192+
" Columns will not be dropped from tables"
193+
f" {table_names} {staging_text}"
194+
)
174195

175196
job_client.initialize_storage()
176197

0 commit comments

Comments
 (0)