Skip to content

Commit c7248f0

Browse files
committed
filesystem table format logic added
1 parent 4f52769 commit c7248f0

File tree

8 files changed

+174
-138
lines changed

8 files changed

+174
-138
lines changed

dlt/cli/pipeline_command.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,10 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:
441441
fmt.echo("Additional warnings are available")
442442
for warning in drop.info["warnings"]:
443443
fmt.warning(warning)
444+
if len(drop.info["notes"]):
445+
fmt.echo("Additional notes are available")
446+
for note in drop.info["notes"]:
447+
fmt.echo(fmt.style(note, fg="yellow"))
444448
return
445449

446450
fmt.echo(
@@ -467,7 +471,13 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:
467471
fmt.echo(f"\t{fmt.style('from table:', fg='green')} {table_name}")
468472
fmt.echo(f"\t\t{fmt.style('columns:', fg='green')} {columns}")
469473

470-
for warning in drop.info["warnings"]:
471-
fmt.warning(warning)
474+
if len(drop.info["warnings"]):
475+
fmt.echo("Additional warnings are available")
476+
for warning in drop.info["warnings"]:
477+
fmt.warning(warning)
478+
if len(drop.info["notes"]):
479+
fmt.echo("Additional info is available:")
480+
for note in drop.info["notes"]:
481+
fmt.echo(fmt.style(note, fg="yellow"))
472482
if fmt.confirm("Do you want to apply these changes?", default=False):
473483
drop()

dlt/cli/plugins.py

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -324,11 +324,11 @@ def configure_parser(self, pipe_cmd: argparse.ArgumentParser) -> None:
324324
325325
```text
326326
About to drop the following data in dataset airflow_events_1 in destination dlt.destinations.duckdb:
327-
Selected schema:: github_repo_events
328-
Selected resource(s):: ['repo_events']
329-
Table(s) to drop:: ['issues_event', 'fork_event', 'pull_request_event', 'pull_request_review_event', 'pull_request_review_comment_event', 'watch_event', 'issue_comment_event', 'push_event__payload__commits', 'push_event']
330-
Resource(s) state to reset:: ['repo_events']
331-
Source state path(s) to reset:: []
327+
Selected schema: github_repo_events
328+
Selected resource(s): ['repo_events']
329+
Table(s) to drop: ['issues_event', 'fork_event', 'pull_request_event', 'pull_request_review_event', 'pull_request_review_comment_event', 'watch_event', 'issue_comment_event', 'push_event__payload__commits', 'push_event']
330+
Resource(s) state to reset: ['repo_events']
331+
Source state path(s) to reset: []
332332
Do you want to apply these changes? [y/N]
333333
```
334334
@@ -436,16 +436,49 @@ def configure_parser(self, pipe_cmd: argparse.ArgumentParser) -> None:
436436
"drop-columns",
437437
help="Selectively drop columns from specified tables",
438438
description="""
439-
Selectively drop columns from specified resources.
439+
Selectively drop columns from specified resources and tables.
440440
441441
```sh
442-
dlt pipeline <pipeline name> drop-columns --from [resource_1] [resource_2] --columns [column_1] [column_2]
442+
dlt pipeline <pipeline name> drop-columns --from-resources [resource_1] [resource_2] --from-tables [table_1] [table_2] --columns [column_1] [column_2]
443443
```
444444
445-
Drops selected columns in the tables generated by selected resources,
446-
unless the columns have a hint that makes them undroppable.
445+
**How column selection works:**
447446
448-
You can use regexes to select resources and columns. Prepend the `re:` string to indicate a regex pattern.
447+
1. **Resource resolution**: If `--from-resources` is specified, tables are grouped by resource using regex pattern matching. If omitted, all resources are considered.
448+
449+
2. **Table resolution**: If `--from-tables` is specified, only tables matching the pattern(s) within the selected resources are considered. If omitted, all tables from selected resources are considered.
450+
451+
3. **Column resolution**: Columns are matched against the specified pattern(s) within the selected tables. Only nullable columns without disqualifying hints can be dropped.
452+
453+
**Column safety rules:**
454+
455+
Only columns that meet ALL of the following criteria can be dropped:
456+
- The column is nullable (can contain NULL values)
457+
- The column does not have any disqualifying hints such as: `partition`, `cluster`, `unique`, `sort`, `primary_key`, `row_key`, `parent_key`, `root_key`, `merge_key`, `variant`, `hard_delete`, `dedup_sort`, or `incremental`
458+
- After dropping the matched columns, at least one non-dlt internal column must remain in the table
459+
460+
**Filesystem destination note:**
461+
462+
For filesystem destination, column dropping is only supported for tables that have an associated `table_format` (e.g., Iceberg, Delta). Tables without a table format will be skipped with a notification.
463+
464+
**Example Output:**
465+
466+
```text
467+
About to drop the following columns in dataset my_dataset in destination dlt.destinations.duckdb:
468+
Selected schema: droppable_source
469+
Selected resource(s): ['droppable_b']
470+
Table(s) to be affected: ['droppable_b__items']
471+
Column(s) to be dropped:
472+
from table: droppable_b__items
473+
columns: ['m']
474+
Do you want to apply these changes?
475+
```
476+
477+
**Pattern matching:**
478+
479+
You can use regexes to select resources, tables and columns. Prepend the `re:` string to indicate a regex pattern. For example:
480+
- `--from-resources "re:^user"` matches all resources starting with "user"
481+
- `--columns "re:.*_temp$"` matches all columns ending with "_temp"
449482
""",
450483
epilog=(
451484
f"See {DLT_PIPELINE_COMMAND_DOCS_URL}#selectively-drop-tables-and-reset-state for"

dlt/destinations/impl/filesystem/filesystem.py

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -964,46 +964,6 @@ def drop_columns(
964964
table=delta_table,
965965
columns_to_drop=columns_to_drop,
966966
)
967-
else:
968-
# Handle regular filesystem tables (jsonl, parquet, csv)
969-
self._drop_columns_from_regular_table(table_name, columns_to_drop)
970967

971968
if update_schema:
972969
self._update_schema_in_storage(self.schema)
973-
974-
def _drop_columns_from_regular_table(self, table_name: str, columns_to_drop: List[str]) -> None:
975-
from dlt.common.libs.pyarrow import pyarrow as pa
976-
977-
table_files = self.list_table_files(table_name)
978-
979-
for file_path in table_files:
980-
file_ext = os.path.splitext(file_path)[1].lower()
981-
982-
if file_ext == ".parquet":
983-
table = pa.parquet.read_table(self.make_remote_url(file_path))
984-
columns_to_keep = [col for col in table.column_names if col not in columns_to_drop]
985-
filtered_table = table.select(columns_to_keep)
986-
with pa.parquet.ParquetWriter(
987-
self.make_remote_url(file_path), filtered_table.schema
988-
) as writer:
989-
writer.write_table(filtered_table)
990-
991-
elif file_ext == ".jsonl":
992-
content = self.fs_client.read_text(file_path, encoding="utf-8")
993-
lines = content.strip().split("\n")
994-
995-
filtered_lines = []
996-
for line in lines:
997-
if line.strip():
998-
record = json.loads(line)
999-
for col in columns_to_drop:
1000-
record.pop(col, None)
1001-
filtered_lines.append(json.dumps(record))
1002-
1003-
self.fs_client.write_text(file_path, "\n".join(filtered_lines), encoding="utf-8")
1004-
1005-
elif file_ext == ".csv":
1006-
table = pa.csv.read_csv(self.make_remote_url(file_path))
1007-
columns_to_keep = [col for col in table.column_names if col not in columns_to_drop]
1008-
filtered_table = table.select(columns_to_keep)
1009-
pa.csv.write_csv(filtered_table, self.make_remote_url(file_path))

dlt/pipeline/drop.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class _DropInfo(TypedDict):
4141
drop_all: bool
4242
resource_pattern: Optional[REPattern]
4343
warnings: List[str]
44+
notes: List[str]
4445
drop_columns: bool
4546

4647

@@ -171,6 +172,7 @@ def drop_resources(
171172
drop_all=drop_all,
172173
resource_pattern=resource_pattern,
173174
warnings=[],
175+
notes=[],
174176
drop_columns=False,
175177
)
176178

@@ -247,6 +249,7 @@ def drop_columns(
247249
from_resources: Union[Iterable[Union[str, TSimpleRegex]], Union[str, TSimpleRegex]] = (),
248250
from_tables: Union[Iterable[Union[str, TSimpleRegex]], Union[str, TSimpleRegex]] = (),
249251
columns: Union[Iterable[Union[str, TSimpleRegex]], Union[str, TSimpleRegex]] = (),
252+
is_filesystem: bool = False,
250253
) -> _DropResult:
251254
"""Generate a new schema and pipeline state with the requested columns removed.
252255
@@ -295,6 +298,7 @@ def drop_columns(
295298

296299
# Collect columns to drop grouped by table
297300
warnings: List[str] = []
301+
notes: List[str] = []
298302
from_tables_drop_cols: List[_FromTableDropCols] = []
299303
affected_schema_table_names: List[str] = []
300304

@@ -308,8 +312,8 @@ def drop_columns(
308312

309313
if not can_drop and len(matched_droppable_cols) > 0:
310314
warning = (
311-
f"""After dropping matched droppable columns {matched_droppable_cols} from table '{table_name}'"""
312-
" only internal dlt columns will remain. This is not allowed."
315+
f"After dropping matched droppable columns {matched_droppable_cols} from table"
316+
f" '{table_name}' only internal dlt columns will remain. This is not allowed."
313317
)
314318
warnings.append(warning)
315319
continue
@@ -320,6 +324,18 @@ def drop_columns(
320324
]
321325

322326
if drop_cols:
327+
# Tables without table format are not supported
328+
if is_filesystem and "table_format" not in table:
329+
drop_cols_str = ",".join(drop_cols)
330+
note = (
331+
f"Skipped table '{table_name}' with selected column(s) '{drop_cols_str}'"
332+
" because it does not use a supported table format. Column dropping in"
333+
" filesystem destinations requires the table to have an associated table"
334+
" format."
335+
)
336+
notes.append(note)
337+
continue
338+
323339
from_tables_drop_cols.append({"from_table": table_name, "drop_columns": drop_cols})
324340
affected_schema_table_names.append(table_name)
325341

@@ -348,6 +364,7 @@ def drop_columns(
348364
drop_all=False,
349365
resource_pattern=resource_pattern,
350366
warnings=warnings,
367+
notes=notes,
351368
drop_columns=drop_columns,
352369
)
353370

dlt/pipeline/helpers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ def __init__(
9191
from_resources,
9292
from_tables,
9393
columns,
94+
pipeline.destination.destination_name == "filesystem",
9495
)
9596
else:
9697
drop_result = drop_resources(

docs/website/docs/reference/command-line-interface.md

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -384,11 +384,11 @@ reset:
384384
385385
```text
386386
About to drop the following data in dataset airflow_events_1 in destination dlt.destinations.duckdb:
387-
Selected schema:: github_repo_events
388-
Selected resource(s):: ['repo_events']
389-
Table(s) to drop:: ['issues_event', 'fork_event', 'pull_request_event', 'pull_request_review_event', 'pull_request_review_comment_event', 'watch_event', 'issue_comment_event', 'push_event__payload__commits', 'push_event']
390-
Resource(s) state to reset:: ['repo_events']
391-
Source state path(s) to reset:: []
387+
Selected schema: github_repo_events
388+
Selected resource(s): ['repo_events']
389+
Table(s) to drop: ['issues_event', 'fork_event', 'pull_request_event', 'pull_request_review_event', 'pull_request_review_comment_event', 'watch_event', 'issue_comment_event', 'push_event__payload__commits', 'push_event']
390+
Resource(s) state to reset: ['repo_events']
391+
Source state path(s) to reset: []
392392
Do you want to apply these changes? [y/N]
393393
```
394394
@@ -501,16 +501,49 @@ dlt pipeline [pipeline_name] drop-columns [-h] [--from-resources [FROM_RESOURCES
501501
502502
**Description**
503503
504-
Selectively drop columns from specified resources.
504+
Selectively drop columns from specified resources and tables.
505505
506506
```sh
507-
dlt pipeline <pipeline name> drop-columns --from [resource_1] [resource_2] --columns [column_1] [column_2]
507+
dlt pipeline <pipeline name> drop-columns --from-resources [resource_1] [resource_2] --from-tables [table_1] [table_2] --columns [column_1] [column_2]
508508
```
509509
510-
Drops selected columns in the tables generated by selected resources,
511-
unless the columns have a hint that makes them undroppable.
510+
**How column selection works:**
512511
513-
You can use regexes to select resources and columns. Prepend the `re:` string to indicate a regex pattern.
512+
1. **Resource resolution**: If `--from-resources` is specified, tables are grouped by resource using regex pattern matching. If omitted, all resources are considered.
513+
514+
2. **Table resolution**: If `--from-tables` is specified, only tables matching the pattern(s) within the selected resources are considered. If omitted, all tables from selected resources are considered.
515+
516+
3. **Column resolution**: Columns are matched against the specified pattern(s) within the selected tables. Only nullable columns without disqualifying hints can be dropped.
517+
518+
**Column safety rules:**
519+
520+
Only columns that meet ALL of the following criteria can be dropped:
521+
- The column is nullable (can contain NULL values)
522+
- The column does not have any disqualifying hints such as: `partition`, `cluster`, `unique`, `sort`, `primary_key`, `row_key`, `parent_key`, `root_key`, `merge_key`, `variant`, `hard_delete`, `dedup_sort`, or `incremental`
523+
- After dropping the matched columns, at least one non-dlt internal column must remain in the table
524+
525+
**Filesystem destination note:**
526+
527+
For filesystem destination, column dropping is only supported for tables that have an associated `table_format` (e.g., Iceberg, Delta). Tables without a table format will be skipped with a notification.
528+
529+
**Example Output:**
530+
531+
```text
532+
About to drop the following columns in dataset my_dataset in destination dlt.destinations.duckdb:
533+
Selected schema: droppable_source
534+
Selected resource(s): ['droppable_b']
535+
Table(s) to be affected: ['droppable_b__items']
536+
Column(s) to be dropped:
537+
from table: droppable_b__items
538+
columns: ['m']
539+
Do you want to apply these changes?
540+
```
541+
542+
**Pattern matching:**
543+
544+
You can use regexes to select resources, tables and columns. Prepend the `re:` string to indicate a regex pattern. For example:
545+
- `--from-resources "re:^user"` matches all resources starting with "user"
546+
- `--columns "re:.*_temp$"` matches all columns ending with "_temp".
514547
515548
<details>
516549

0 commit comments

Comments
 (0)