Skip to content

Commit 423a0fe

Browse files
authoredDec 11, 2024··
feat: Adds helper functions for migrations (apache#31303)
1 parent fd57fce commit 423a0fe

13 files changed

+234
-70
lines changed
 

‎superset/migrations/shared/constraints.py‎

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@
1919
from dataclasses import dataclass
2020

2121
from alembic import op
22-
from sqlalchemy.dialects.sqlite.base import SQLiteDialect # noqa: E402
2322
from sqlalchemy.engine.reflection import Inspector
2423

25-
from superset.migrations.shared.utils import has_table
2624
from superset.utils.core import generic_find_fk_constraint_name
2725

2826

@@ -73,23 +71,3 @@ def redefine(
7371
ondelete=on_delete,
7472
onupdate=on_update,
7573
)
76-
77-
78-
def drop_fks_for_table(table_name: str) -> None:
79-
"""
80-
Drop all foreign key constraints for a table if it exist and the database
81-
is not sqlite.
82-
83-
:param table_name: The table name to drop foreign key constraints for
84-
"""
85-
connection = op.get_bind()
86-
inspector = Inspector.from_engine(connection)
87-
88-
if isinstance(connection.dialect, SQLiteDialect):
89-
return # sqlite doesn't like constraints
90-
91-
if has_table(table_name):
92-
foreign_keys = inspector.get_foreign_keys(table_name)
93-
94-
for fk in foreign_keys:
95-
op.drop_constraint(fk["name"], table_name, type_="foreignkey")

‎superset/migrations/shared/utils.py‎

Lines changed: 213 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,25 @@
2121
from typing import Any, Callable, Optional, Union
2222
from uuid import uuid4
2323

24-
import sqlalchemy as sa
2524
from alembic import op
26-
from sqlalchemy import inspect
25+
from sqlalchemy import Column, inspect
2726
from sqlalchemy.dialects.mysql.base import MySQLDialect
2827
from sqlalchemy.dialects.postgresql.base import PGDialect
28+
from sqlalchemy.dialects.sqlite.base import SQLiteDialect # noqa: E402
29+
from sqlalchemy.engine.reflection import Inspector
2930
from sqlalchemy.exc import NoSuchTableError
3031
from sqlalchemy.orm import Query, Session
32+
from sqlalchemy.sql.schema import SchemaItem
3133

3234
from superset.utils import json
3335

34-
logger = logging.getLogger(__name__)
36+
GREEN = "\033[32m"
37+
RESET = "\033[0m"
38+
YELLOW = "\033[33m"
39+
RED = "\033[31m"
40+
LRED = "\033[91m"
41+
42+
logger = logging.getLogger("alembic")
3543

3644
DEFAULT_BATCH_SIZE = int(os.environ.get("BATCH_SIZE", 1000))
3745

@@ -185,15 +193,208 @@ def has_table(table_name: str) -> bool:
185193
return table_exists
186194

187195

188-
def add_column_if_not_exists(table_name: str, column: sa.Column) -> None:
196+
def drop_fks_for_table(table_name: str) -> None:
197+
"""
198+
Drop all foreign key constraints for a table if it exist and the database
199+
is not sqlite.
200+
201+
:param table_name: The table name to drop foreign key constraints for
202+
"""
203+
connection = op.get_bind()
204+
inspector = Inspector.from_engine(connection)
205+
206+
if isinstance(connection.dialect, SQLiteDialect):
207+
return # sqlite doesn't like constraints
208+
209+
if has_table(table_name):
210+
foreign_keys = inspector.get_foreign_keys(table_name)
211+
for fk in foreign_keys:
212+
logger.info(
213+
f"Dropping foreign key {GREEN}{fk['name']}{RESET} from table {GREEN}{table_name}{RESET}..."
214+
)
215+
op.drop_constraint(fk["name"], table_name, type_="foreignkey")
216+
217+
218+
def create_table(table_name: str, *columns: SchemaItem) -> None:
219+
"""
220+
Creates a database table with the specified name and columns.
221+
222+
This function checks if a table with the given name already exists in the database.
223+
If the table already exists, it logs an informational.
224+
Otherwise, it proceeds to create a new table using the provided name and schema columns.
225+
226+
:param table_name: The name of the table to be created.
227+
:param columns: A variable number of arguments representing the schema just like when calling alembic's method create_table()
228+
"""
229+
230+
if has_table(table_name=table_name):
231+
logger.info(f"Table {LRED}{table_name}{RESET} already exists. Skipping...")
232+
return
233+
234+
logger.info(f"Creating table {GREEN}{table_name}{RESET}...")
235+
op.create_table(table_name, *columns)
236+
logger.info(f"Table {GREEN}{table_name}{RESET} created.")
237+
238+
239+
def drop_table(table_name: str) -> None:
189240
"""
190-
Adds a column to a table if it does not already exist.
241+
Drops a database table with the specified name.
191242
192-
:param table_name: Name of the table.
193-
:param column: SQLAlchemy Column object.
243+
This function checks if a table with the given name exists in the database.
244+
If the table does not exist, it logs an informational message and skips the dropping process.
245+
If the table exists, it first attempts to drop all foreign key constraints associated with the table
246+
(handled by `drop_fks_for_table`) and then proceeds to drop the table.
247+
248+
:param table_name: The name of the table to be dropped.
194249
"""
195-
if not table_has_column(table_name, column.name):
196-
print(f"Adding column '{column.name}' to table '{table_name}'.\n")
197-
op.add_column(table_name, column)
198-
else:
199-
print(f"Column '{column.name}' already exists in table '{table_name}'.\n")
250+
251+
if not has_table(table_name=table_name):
252+
logger.info(f"Table {GREEN}{table_name}{RESET} doesn't exist. Skipping...")
253+
return
254+
255+
logger.info(f"Dropping table {GREEN}{table_name}{RESET}...")
256+
drop_fks_for_table(table_name)
257+
op.drop_table(table_name=table_name)
258+
logger.info(f"Table {GREEN}{table_name}{RESET} dropped.")
259+
260+
261+
def batch_operation(
262+
callable: Callable[[int, int], None], count: int, batch_size: int
263+
) -> None:
264+
"""
265+
Executes an operation by dividing a task into smaller batches and tracking progress.
266+
267+
This function is designed to process a large number of items in smaller batches. It takes a callable
268+
that performs the operation on each batch. The function logs the progress of the operation as it processes
269+
through the batches.
270+
271+
If count is set to 0 or lower, it logs an informational message and skips the batch process.
272+
273+
:param callable: A callable function that takes two integer arguments:
274+
the start index and the end index of the current batch.
275+
:param count: The total number of items to process.
276+
:param batch_size: The number of items to process in each batch.
277+
"""
278+
if count <= 0:
279+
logger.info(
280+
f"No records to process in batch {LRED}(count <= 0){RESET} for callable {LRED}other_callable_example{RESET}. Skipping..."
281+
)
282+
return
283+
for offset in range(0, count, batch_size):
284+
percentage = (offset / count) * 100 if count else 0
285+
logger.info(f"Progress: {offset:,}/{count:,} ({percentage:.2f}%)")
286+
callable(offset, min(offset + batch_size, count))
287+
288+
logger.info(f"Progress: {count:,}/{count:,} (100%)")
289+
logger.info(
290+
f"End: {GREEN}{callable.__name__}{RESET} batch operation {GREEN}succesfully{RESET} executed."
291+
)
292+
293+
294+
def add_columns(table_name: str, *columns: Column) -> None:
295+
"""
296+
Adds new columns to an existing database table.
297+
298+
If a column already exists, it logs an informational message and skips the adding process.
299+
Otherwise, it proceeds to add the new column to the table.
300+
301+
The operation is performed using Alembic's batch_alter_table.
302+
303+
:param table_name: The name of the table to which the columns will be added.
304+
:param columns: A list of SQLAlchemy Column objects that define the name, type, and other attributes of the columns to be added.
305+
"""
306+
307+
cols_to_add = []
308+
for col in columns:
309+
if table_has_column(table_name=table_name, column_name=col.name):
310+
logger.info(
311+
f"Column {LRED}{col.name}{RESET} already present on table {LRED}{table_name}{RESET}. Skipping..."
312+
)
313+
else:
314+
cols_to_add.append(col)
315+
316+
with op.batch_alter_table(table_name) as batch_op:
317+
for col in cols_to_add:
318+
logger.info(
319+
f"Adding column {GREEN}{col.name}{RESET} to table {GREEN}{table_name}{RESET}..."
320+
)
321+
batch_op.add_column(col)
322+
323+
324+
def drop_columns(table_name: str, *columns: str) -> None:
325+
"""
326+
Drops specified columns from an existing database table.
327+
328+
If a column does not exist, it logs an informational message and skips the dropping process.
329+
Otherwise, it proceeds to remove the column from the table.
330+
331+
The operation is performed using Alembic's batch_alter_table.
332+
333+
:param table_name: The name of the table from which the columns will be removed.
334+
:param columns: A list of column names to be dropped.
335+
"""
336+
337+
cols_to_drop = []
338+
for col in columns:
339+
if not table_has_column(table_name=table_name, column_name=col):
340+
logger.info(
341+
f"Column {LRED}{col}{RESET} is not present on table {LRED}{table_name}{RESET}. Skipping..."
342+
)
343+
else:
344+
cols_to_drop.append(col)
345+
346+
with op.batch_alter_table(table_name) as batch_op:
347+
for col in cols_to_drop:
348+
logger.info(
349+
f"Dropping column {GREEN}{col}{RESET} from table {GREEN}{table_name}{RESET}..."
350+
)
351+
batch_op.drop_column(col)
352+
353+
354+
def create_index(table_name: str, index_name: str, *columns: str) -> None:
355+
"""
356+
Creates an index on specified columns of an existing database table.
357+
358+
If the index already exists, it logs an informational message and skips the creation process.
359+
Otherwise, it proceeds to create a new index with the specified name on the given columns of the table.
360+
361+
:param table_name: The name of the table on which the index will be created.
362+
:param index_name: The name of the index to be created.
363+
:param columns: A list column names where the index will be created
364+
"""
365+
366+
if table_has_index(table=table_name, index=index_name):
367+
logger.info(
368+
f"Table {LRED}{table_name}{RESET} already has index {LRED}{index_name}{RESET}. Skipping..."
369+
)
370+
return
371+
372+
logger.info(
373+
f"Creating index {GREEN}{index_name}{RESET} on table {GREEN}{table_name}{RESET}"
374+
)
375+
376+
op.create_index(table_name=table_name, index_name=index_name, columns=columns)
377+
378+
379+
def drop_index(table_name: str, index_name: str) -> None:
380+
"""
381+
Drops an index from an existing database table.
382+
383+
If the index does not exists, it logs an informational message and skips the dropping process.
384+
Otherwise, it proceeds with the removal operation.
385+
386+
:param table_name: The name of the table from which the index will be dropped.
387+
:param index_name: The name of the index to be dropped.
388+
"""
389+
390+
if not table_has_index(table=table_name, index=index_name):
391+
logger.info(
392+
f"Table {LRED}{table_name}{RESET} doesn't have index {LRED}{index_name}{RESET}. Skipping..."
393+
)
394+
return
395+
396+
logger.info(
397+
f"Dropping index {GREEN}{index_name}{RESET} from table {GREEN}{table_name}{RESET}..."
398+
)
399+
400+
op.drop_index(table_name=table_name, index_name=index_name)

‎superset/migrations/versions/2024-03-20_16-02_678eefb4ab44_add_access_token_table.py‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from alembic import op # noqa: E402
3131
from sqlalchemy_utils import EncryptedType # noqa: E402
3232

33-
from superset.migrations.shared.constraints import drop_fks_for_table # noqa: E402
33+
from superset.migrations.shared.utils import drop_fks_for_table # noqa: E402
3434

3535

3636
def upgrade():

‎superset/migrations/versions/2024-04-01_22-44_c22cb5c2e546_user_attr_avatar_url.py‎

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,20 @@
2222
"""
2323

2424
import sqlalchemy as sa
25-
from alembic import op
2625

27-
from superset.migrations.shared.utils import add_column_if_not_exists
26+
from superset.migrations.shared.utils import add_columns, drop_columns
2827

2928
# revision identifiers, used by Alembic.
3029
revision = "c22cb5c2e546"
3130
down_revision = "678eefb4ab44"
3231

3332

3433
def upgrade():
35-
add_column_if_not_exists(
34+
add_columns(
3635
"user_attribute",
3736
sa.Column("avatar_url", sa.String(length=100), nullable=True),
3837
)
3938

4039

4140
def downgrade():
42-
op.drop_column("user_attribute", "avatar_url")
41+
drop_columns("user_attribute", "avatar_url")

‎superset/migrations/versions/2024-04-11_15-41_5f57af97bc3f_add_catalog_column.py‎

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,8 @@
2323
"""
2424

2525
import sqlalchemy as sa
26-
from alembic import op
2726

28-
from superset.migrations.shared.utils import add_column_if_not_exists
27+
from superset.migrations.shared.utils import add_columns, drop_columns
2928

3029
# revision identifiers, used by Alembic.
3130
revision = "5f57af97bc3f"
@@ -36,12 +35,9 @@
3635

3736
def upgrade():
3837
for table in tables:
39-
add_column_if_not_exists(
40-
table,
41-
sa.Column("catalog", sa.String(length=256), nullable=True),
42-
)
38+
add_columns(table, sa.Column("catalog", sa.String(length=256), nullable=True))
4339

4440

4541
def downgrade():
4642
for table in reversed(tables):
47-
op.drop_column(table, "catalog")
43+
drop_columns(table, "catalog")

‎superset/migrations/versions/2024-05-01_10-52_58d051681a3b_add_catalog_perm_to_tables.py‎

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,32 +23,29 @@
2323
"""
2424

2525
import sqlalchemy as sa
26-
from alembic import op
2726

2827
from superset.migrations.shared.catalogs import (
2928
downgrade_catalog_perms,
3029
upgrade_catalog_perms,
3130
)
32-
from superset.migrations.shared.utils import add_column_if_not_exists
31+
from superset.migrations.shared.utils import add_columns, drop_columns
3332

3433
# revision identifiers, used by Alembic.
3534
revision = "58d051681a3b"
3635
down_revision = "4a33124c18ad"
3736

3837

3938
def upgrade():
40-
add_column_if_not_exists(
41-
"tables",
42-
sa.Column("catalog_perm", sa.String(length=1000), nullable=True),
39+
add_columns(
40+
"tables", sa.Column("catalog_perm", sa.String(length=1000), nullable=True)
4341
)
44-
add_column_if_not_exists(
45-
"slices",
46-
sa.Column("catalog_perm", sa.String(length=1000), nullable=True),
42+
add_columns(
43+
"slices", sa.Column("catalog_perm", sa.String(length=1000), nullable=True)
4744
)
4845
upgrade_catalog_perms(engines={"postgresql"})
4946

5047

5148
def downgrade():
5249
downgrade_catalog_perms(engines={"postgresql"})
53-
op.drop_column("slices", "catalog_perm")
54-
op.drop_column("tables", "catalog_perm")
50+
drop_columns("slices", "catalog_perm")
51+
drop_columns("tables", "catalog_perm")

‎superset/migrations/versions/2024-05-24_11-31_02f4f7811799_remove_sl_dataset_columns_table.py‎

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@
2525
import sqlalchemy as sa
2626
from alembic import op
2727

28-
from superset.migrations.shared.constraints import drop_fks_for_table
29-
from superset.migrations.shared.utils import has_table
28+
from superset.migrations.shared.utils import drop_fks_for_table, has_table
3029

3130
# revision identifiers, used by Alembic.
3231
revision = "02f4f7811799"

0 commit comments

Comments
 (0)
Please sign in to comment.