Skip to content

Commit 4c1bafe

Browse files
committed
Initial commit with --from --columns enabled
1 parent f42921f commit 4c1bafe

File tree

9 files changed

+223
-40
lines changed

9 files changed

+223
-40
lines changed

dlt/cli/pipeline_command.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,8 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:
343343
drop = DropCommand(p, **command_kwargs)
344344
if drop.is_empty:
345345
fmt.echo(
346-
"Could not select any resources to drop and no resource/source state to reset. Use"
347-
" the command below to inspect the pipeline:"
346+
"Could not select any resources or columns to drop and no resource/source state to"
347+
" reset. Use the command below to inspect the pipeline:"
348348
)
349349
fmt.echo(f"dlt pipeline -v {p.pipeline_name} info")
350350
if len(drop.info["warnings"]):
@@ -368,11 +368,20 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:
368368
drop.info["resource_names"],
369369
)
370370
)
371-
fmt.echo("%s: %s" % (fmt.style("Table(s) to drop", fg="green"), drop.info["tables"]))
371+
label = "Table(s) to be affected" if drop.from_tables_drop_cols else "Table(s) to drop"
372+
fmt.echo(f"{fmt.style(label, fg='green')}: {drop.info['tables']}")
372373
fmt.echo(
373-
"%s: %s"
374-
% (fmt.style("\twith data in destination", fg="green"), drop.info["tables_with_data"])
374+
f"{fmt.style('\twith data in destination', fg='green')}:"
375+
f" {drop.info['tables_with_data']}"
375376
)
377+
if drop.from_tables_drop_cols:
378+
fmt.echo(f"{fmt.style('Column(s) to be dropped', fg='green')}:")
379+
for from_table_drop_cols in drop.from_tables_drop_cols:
380+
table_name = from_table_drop_cols["from_table"]
381+
columns = from_table_drop_cols["drop_columns"]
382+
fmt.echo(f"{fmt.style('\tfrom table:', fg='green')} {table_name}")
383+
fmt.echo(f"{fmt.style('\tcolumns:', fg='green')} {columns}")
384+
376385
fmt.echo(
377386
"%s: %s"
378387
% (

dlt/cli/plugins.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ def configure_parser(self, pipe_cmd: argparse.ArgumentParser) -> None:
390390
help="Drop all resources found in schema. Supersedes [resources] argument.",
391391
)
392392
pipe_cmd_drop.add_argument(
393-
"--state-paths", nargs="*", help="State keys or json paths to drop", default=()
393+
"--state-paths", nargs="*", help="State keys or json paths to drop.", default=()
394394
)
395395
pipe_cmd_drop.add_argument(
396396
"--schema",
@@ -403,6 +403,18 @@ def configure_parser(self, pipe_cmd: argparse.ArgumentParser) -> None:
403403
help="Only wipe state for matching resources without dropping tables.",
404404
default=False,
405405
)
406+
pipe_cmd_drop.add_argument(
407+
"--from",
408+
dest="from_resources",
409+
nargs="*",
410+
help="(With --columns) Resource name to drop columns from.",
411+
)
412+
pipe_cmd_drop.add_argument(
413+
"--columns",
414+
nargs="*",
415+
help="(With --from) One or more column names to drop from the specified resource.",
416+
default=(),
417+
)
406418

407419
pipe_cmd_package = pipeline_subparsers.add_parser(
408420
"load-package",
@@ -425,6 +437,11 @@ def configure_parser(self, pipe_cmd: argparse.ArgumentParser) -> None:
425437
def execute(self, args: argparse.Namespace) -> None:
426438
if args.list_pipelines:
427439
pipeline_command_wrapper("list", "-", args.pipelines_dir, args.verbosity)
440+
elif (hasattr(args, "from_resources") and hasattr(args, "columns")) and (
441+
(args.from_resources and not args.columns) or (not args.from_resources and args.columns)
442+
):
443+
fmt.error("Please use --from and --columns together.")
444+
raise CliCommandException()
428445
else:
429446
command_kwargs = dict(args._get_kwargs())
430447
if not command_kwargs.get("pipeline_name"):

dlt/common/storages/load_package.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
cast,
2121
Any,
2222
Tuple,
23+
TYPE_CHECKING,
2324
)
2425
from typing_extensions import NotRequired
2526

@@ -53,6 +54,9 @@
5354
)
5455
from dlt.common.time import precise_time
5556

57+
if TYPE_CHECKING:
58+
from dlt.pipeline.drop import _FromTableDropCols
59+
5660
TJobFileFormat = Literal["sql", "reference", TLoaderFileFormat]
5761
"""Loader file formats with internal job types"""
5862
JOB_EXCEPTION_EXTENSION = ".exception"
@@ -71,6 +75,8 @@ class TPipelineStateDoc(TypedDict, total=False):
7175

7276

7377
class TLoadPackageDropTablesState(TypedDict):
78+
from_tables_drop_columns: NotRequired[List["_FromTableDropCols"]]
79+
"""List of tables and columns that are to be dropped from them"""
7480
dropped_tables: NotRequired[List[TTableSchema]]
7581
"""List of tables that are to be dropped from the schema and destination (i.e. when `refresh` mode is used)"""
7682
truncated_tables: NotRequired[List[TTableSchema]]

dlt/destinations/job_client_impl.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
Type,
1616
Iterable,
1717
Iterator,
18+
Union,
1819
)
1920
import zlib
2021
import re
@@ -329,6 +330,24 @@ def drop_tables(self, *tables: str, delete_schema: bool = True) -> None:
329330
if delete_schema:
330331
self._delete_schema_in_storage(self.schema)
331332

333+
def drop_columns(
334+
self,
335+
from_tables_drop_cols: List[Dict[str, Union[str, List[str]]]],
336+
delete_schema: bool = True,
337+
) -> None:
338+
"""Drop columns in destination database and optionally delete the stored schema as well.
339+
Clients that support ddl transactions will have both operations performed in a single transaction.
340+
341+
Args:
342+
from_tables: Names of tables from which columns are to be dropped.
343+
columns: Names of columns to be dropped.
344+
delete_schema: If True, also delete all versions of the current schema from storage
345+
"""
346+
with self.maybe_ddl_transaction():
347+
self.sql_client.drop_columns(from_tables_drop_cols)
348+
if delete_schema:
349+
self._delete_schema_in_storage(self.schema)
350+
332351
@contextlib.contextmanager
333352
def maybe_ddl_transaction(self) -> Iterator[None]:
334353
"""Begins a transaction if sql client supports it, otherwise works in auto commit."""

dlt/destinations/sql_client.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818
List,
1919
Generator,
2020
cast,
21+
Union,
2122
)
2223

2324
from dlt.common.typing import TFun, TypedDict, Self
24-
from dlt.common.schema.typing import TTableSchemaColumns
25+
from dlt.common.schema.typing import TTableSchemaColumns, TColumnSchema
2526
from dlt.common.destination import DestinationCapabilitiesContext
2627
from dlt.common.utils import concat_strings_with_limit
2728
from dlt.common.destination.client import JobClientBase
@@ -141,6 +142,20 @@ def drop_tables(self, *tables: str) -> None:
141142
]
142143
self.execute_many(statements)
143144

145+
def drop_columns(self, from_tables_drop_cols: List[Dict[str, Union[str, List[str]]]]) -> None:
146+
"""Drops specified columns from specified tables if they exist"""
147+
148+
statements = []
149+
for from_table_drop_cols in from_tables_drop_cols:
150+
table = cast(str, from_table_drop_cols["from_table"])
151+
for column in from_table_drop_cols["drop_columns"]:
152+
statements.append(
153+
f"ALTER TABLE {self.make_qualified_table_name(table)} DROP COLUMN IF EXISTS"
154+
f" {self.escape_column_name(column)};"
155+
)
156+
157+
self.execute_many(statements)
158+
144159
def _to_named_paramstyle(self, query: str, args: Sequence[Any]) -> Tuple[str, Dict[str, Any]]:
145160
"""Convert a query from "format" ( %s ) paramstyle to "named" ( :param_name ) paramstyle.
146161
The %s are replaced with :arg0, :arg1, ... and the arguments are returned as a dictionary.

dlt/load/load.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import contextlib
22
from functools import reduce
3-
from typing import Dict, List, Optional, Tuple, Iterator, Sequence
3+
from typing import Dict, List, Optional, Tuple, Iterator, Sequence, Union
44
from concurrent.futures import Executor
55
import os
66

@@ -517,6 +517,7 @@ def load_single_package(self, load_id: str, schema: Schema) -> None:
517517
# and they must be like that in order to drop existing tables
518518
dropped_tables = current_load_package()["state"].get("dropped_tables", [])
519519
truncated_tables = current_load_package()["state"].get("truncated_tables", [])
520+
from_tables_drop_cols = current_load_package()["state"].get("from_tables_drop_columns", [])
520521

521522
self.init_jobs_counter(load_id)
522523

@@ -537,6 +538,7 @@ def load_single_package(self, load_id: str, schema: Schema) -> None:
537538
),
538539
drop_tables=dropped_tables,
539540
truncate_tables=truncated_tables,
541+
from_tables_drop_cols=from_tables_drop_cols,
540542
)
541543

542544
# init staging client

dlt/load/utils.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import List, Set, Iterable, Callable, Optional, Tuple, Sequence
1+
from typing import List, Set, Iterable, Callable, Optional, Tuple, Sequence, Dict, Union, Any
22
from itertools import groupby
33

44
from dlt.common import logger
@@ -11,7 +11,7 @@
1111
)
1212
from dlt.common.storages.load_storage import ParsedLoadJobFileName
1313
from dlt.common.schema import Schema, TSchemaTables
14-
from dlt.common.schema.typing import TTableSchema
14+
from dlt.common.schema.typing import TTableSchema, TColumnSchema
1515
from dlt.common.destination.client import JobClientBase, WithStagingDataset, LoadJob
1616
from dlt.load.configuration import LoaderConfiguration
1717
from dlt.common.destination import DestinationCapabilitiesContext
@@ -71,6 +71,7 @@ def init_client(
7171
load_staging_filter: Callable[[str], bool],
7272
drop_tables: Optional[List[TTableSchema]] = None,
7373
truncate_tables: Optional[List[TTableSchema]] = None,
74+
from_tables_drop_cols: Any = None,
7475
) -> TSchemaTables:
7576
"""Initializes destination storage including staging dataset if supported
7677
@@ -85,6 +86,7 @@ def init_client(
8586
load_staging_filter (Callable[[str], bool]): A filter which tell which table in the staging dataset may be loaded into
8687
drop_tables (Optional[List[TTableSchema]]): List of tables to drop before initializing storage
8788
truncate_tables (Optional[List[TTableSchema]]): List of tables to truncate before initializing storage
89+
drop_columns Optional[Dict[str, Union[List[TTableSchema], List[str]]]]: Columns to drop from specified tables
8890
8991
Returns:
9092
TSchemaTables: Actual migrations done at destination
@@ -113,13 +115,15 @@ def init_client(
113115

114116
# get tables to drop
115117
drop_table_names = {table["name"] for table in drop_tables} if drop_tables else set()
118+
116119
job_client.verify_schema(only_tables=tables_with_jobs | dlt_tables, new_jobs=new_jobs)
117120
applied_update = _init_dataset_and_update_schema(
118121
job_client,
119122
expected_update,
120123
tables_with_jobs | dlt_tables,
121124
truncate_table_names,
122125
drop_tables=drop_table_names,
126+
from_tables_drop_cols=from_tables_drop_cols,
123127
)
124128

125129
# update the staging dataset if client supports this
@@ -153,6 +157,7 @@ def _init_dataset_and_update_schema(
153157
truncate_tables: Iterable[str] = None,
154158
staging_info: bool = False,
155159
drop_tables: Iterable[str] = None,
160+
from_tables_drop_cols: List[Dict[str, Union[str, List[str]]]] = None,
156161
) -> TSchemaTables:
157162
staging_text = "for staging dataset" if staging_info else ""
158163
logger.info(
@@ -171,6 +176,22 @@ def _init_dataset_and_update_schema(
171176
f"Client for {job_client.config.destination_type} does not implement drop table."
172177
f" Following tables {drop_tables} will not be dropped {staging_text}"
173178
)
179+
if from_tables_drop_cols and job_client.is_storage_initialized():
180+
table_names = [
181+
from_table_drop_cols["from_table"] for from_table_drop_cols in from_tables_drop_cols
182+
]
183+
if hasattr(job_client, "drop_columns"):
184+
logger.info(
185+
f"Client for {job_client.config.destination_type} will drop columns"
186+
f" from tables {table_names} {staging_text}"
187+
)
188+
job_client.drop_columns(from_tables_drop_cols, delete_schema=True)
189+
else:
190+
logger.warning(
191+
f"Client for {job_client.config.destination_type} does not implement drop columns."
192+
" Columns will not be dropped from tables"
193+
f" {table_names} {staging_text}"
194+
)
174195

175196
job_client.initialize_storage()
176197

dlt/pipeline/drop.py

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
)
1515
from dlt.common.typing import TypedDict
1616

17-
from dlt.common.schema.typing import TSimpleRegex, TTableSchema
17+
from dlt.common.schema.typing import TSimpleRegex, TTableSchema, TColumnSchema
1818
from dlt.common.schema.utils import (
1919
group_tables_by_resource,
2020
compile_simple_regexes,
@@ -35,14 +35,21 @@ class _DropInfo(TypedDict):
3535
drop_all: bool
3636
resource_pattern: Optional[REPattern]
3737
warnings: List[str]
38+
drop_columns: bool
39+
40+
41+
class _FromTableDropCols(TypedDict):
42+
from_table: str
43+
drop_columns: List[str]
3844

3945

4046
@dataclass
4147
class _DropResult:
4248
schema: Schema
43-
state: TPipelineState
4449
info: _DropInfo
4550
modified_tables: List[TTableSchema]
51+
state: Optional[TPipelineState] = None
52+
from_tables_drop_cols: Optional[List[_FromTableDropCols]] = None
4653

4754

4855
def _create_modified_state(
@@ -62,6 +69,7 @@ def _create_modified_state(
6269
for key in _get_matching_resources(resource_pattern, source_state):
6370
info["resource_states"].append(key)
6471
reset_resource_state(key, source_state)
72+
6573
# drop additional state paths
6674
# Don't drop 'resources' key if jsonpath is wildcard
6775
resolved_paths = [
@@ -153,6 +161,7 @@ def drop_resources(
153161
drop_all=drop_all,
154162
resource_pattern=resource_pattern,
155163
warnings=[],
164+
drop_columns=False,
156165
)
157166

158167
new_state, info = _create_modified_state(
@@ -170,4 +179,71 @@ def drop_resources(
170179
if not state_only:
171180
# drop only the selected tables
172181
schema.drop_tables(tables_to_drop_from_schema_names)
173-
return _DropResult(schema, new_state, info, tables_to_drop_from_dest)
182+
return _DropResult(schema, info, tables_to_drop_from_dest, new_state, None)
183+
184+
185+
def drop_columns(
186+
schema: Schema,
187+
from_resources: Union[Iterable[Union[str, TSimpleRegex]], Union[str, TSimpleRegex]],
188+
columns: Union[Iterable[Union[str, TSimpleRegex]], Union[str, TSimpleRegex]],
189+
) -> _DropResult:
190+
"""Generate a new schema and pipeline state with the requested columns removed.
191+
192+
Args:
193+
schema: The schema to modify. Note that schema is changed in place.
194+
state: The pipeline state to modify. Note that state is changed in place.
195+
from_resources: Resources to drop columns from
196+
columns: Columns to drop from the specified resource
197+
198+
Returns:
199+
A 3 part tuple containing the new schema, the new pipeline state, and a dictionary
200+
containing information about the drop operation.
201+
"""
202+
from_resources = set(from_resources)
203+
columns = set(columns)
204+
resource_pattern = compile_simple_regexes(TSimpleRegex(r) for r in from_resources)
205+
# These are all tables that are supposed to have data
206+
data_tables = {t["name"]: t for t in schema.data_tables(include_incomplete=True)}
207+
# These are resources with tables with data {resource: tables_with_data}
208+
resource_tables = group_tables_by_resource(data_tables, pattern=resource_pattern)
209+
resource_names = list(resource_tables.keys())
210+
# List of tables to drop List[TTableSchema]
211+
tables_to_drop_from_schema = list(chain.from_iterable(resource_tables.values()))
212+
tables_to_drop_from_schema.reverse()
213+
tables_to_drop_from_schema_names = [t["name"] for t in tables_to_drop_from_schema]
214+
# These are tables that have actual data List[TTableSchema]
215+
tables_to_drop_from_dest = [t for t in tables_to_drop_from_schema if has_table_seen_data(t)]
216+
tables_to_drop_from_dest_names = [t["name"] for t in tables_to_drop_from_dest]
217+
218+
# Collect columns to drop by table
219+
from_tables_drop_cols: List[_FromTableDropCols] = []
220+
for table in tables_to_drop_from_dest:
221+
table_name = table["name"]
222+
table_schema_cols = table["columns"]
223+
drop_cols: List[str] = []
224+
225+
for col in columns:
226+
if col in table_schema_cols:
227+
col_schema = schema.tables[table["name"]]["columns"].pop(col)
228+
drop_cols.append(col_schema["name"])
229+
230+
if drop_cols:
231+
from_tables_drop_cols.append({"from_table": table_name, "drop_columns": drop_cols})
232+
233+
# Set to None if no columns need to be dropped
234+
from_tables_drop_cols = from_tables_drop_cols or None
235+
236+
info: _DropInfo = dict(
237+
tables=tables_to_drop_from_schema_names if from_tables_drop_cols else [],
238+
tables_with_data=tables_to_drop_from_dest_names if from_tables_drop_cols else [],
239+
resource_states=[],
240+
state_paths=[],
241+
resource_names=resource_names if from_tables_drop_cols else [],
242+
schema_name=schema.name,
243+
drop_all=False,
244+
resource_pattern=resource_pattern,
245+
warnings=[],
246+
drop_columns=True,
247+
)
248+
249+
return _DropResult(schema, info, tables_to_drop_from_dest, None, from_tables_drop_cols)

0 commit comments

Comments
 (0)