From b99de1a1ff125878abd01c114ce064fa4a42b53e Mon Sep 17 00:00:00 2001 From: Danil Shcherbakov Date: Fri, 30 May 2025 21:14:27 +0100 Subject: [PATCH 1/3] finished test parallelization, cleaning up changes --- pandas/tests/io/test_sql.py | 155 +++++++++++++++++++++++++++++++++--- 1 file changed, 142 insertions(+), 13 deletions(-) diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index ff06d04fc23bd..ff94bf7ea0059 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -17,6 +17,7 @@ import numpy as np import pytest +import xdist from pandas._config import using_string_dtype @@ -601,11 +602,11 @@ def drop_view( @pytest.fixture -def mysql_pymysql_engine(): +def mysql_pymysql_engine(worker_name): sqlalchemy = pytest.importorskip("sqlalchemy") pymysql = pytest.importorskip("pymysql") engine = sqlalchemy.create_engine( - "mysql+pymysql://root@localhost:3306/pandas", + f"mysql+pymysql://root@localhost:3306/pandas{worker_name}", connect_args={"client_flag": pymysql.constants.CLIENT.MULTI_STATEMENTS}, poolclass=sqlalchemy.pool.NullPool, ) @@ -649,11 +650,11 @@ def mysql_pymysql_conn_types(mysql_pymysql_engine_types): @pytest.fixture -def postgresql_psycopg2_engine(): +def postgresql_psycopg2_engine(worker_name): sqlalchemy = pytest.importorskip("sqlalchemy") pytest.importorskip("psycopg2") engine = sqlalchemy.create_engine( - "postgresql+psycopg2://postgres:postgres@localhost:5432/pandas", + f"postgresql+psycopg2://postgres:postgres@localhost:5432/pandas{worker_name}", poolclass=sqlalchemy.pool.NullPool, ) yield engine @@ -684,12 +685,12 @@ def postgresql_psycopg2_conn(postgresql_psycopg2_engine): @pytest.fixture -def postgresql_adbc_conn(): +def postgresql_adbc_conn(worker_name): pytest.importorskip("pyarrow") pytest.importorskip("adbc_driver_postgresql") from adbc_driver_postgresql import dbapi - uri = "postgresql://postgres:postgres@localhost:5432/pandas" + uri = f"postgresql://postgres:postgres@localhost:5432/pandas{worker_name}" with dbapi.connect(uri) as conn: yield conn for view in get_all_views(conn): @@ -748,10 +749,10 @@ def postgresql_psycopg2_conn_types(postgresql_psycopg2_engine_types): @pytest.fixture -def sqlite_str(): +def sqlite_str(worker_name): pytest.importorskip("sqlalchemy") with tm.ensure_clean() as name: - yield f"sqlite:///{name}" + yield f"sqlite:///{name}{worker_name}" @pytest.fixture @@ -816,14 +817,14 @@ def sqlite_conn_types(sqlite_engine_types): yield conn -@pytest.fixture -def sqlite_adbc_conn(): +@pytest.fixture(scope="function") +def sqlite_adbc_conn(worker_name): pytest.importorskip("pyarrow") pytest.importorskip("adbc_driver_sqlite") from adbc_driver_sqlite import dbapi with tm.ensure_clean() as name: - uri = f"file:{name}" + uri = f"file:{name}{worker_name}" with dbapi.connect(uri) as conn: yield conn for view in get_all_views(conn): @@ -894,6 +895,131 @@ def sqlite_buildin_types(sqlite_buildin, types_data): return sqlite_buildin +@pytest.fixture(scope="session") +def worker_name(request): + """ + Creates a unique schema name for Postgres to use, in order to + isolate tests for parallelization. + :return: Name to use for creating an isolated schema + :rtype: str + """ + return xdist.get_xdist_worker_id(request) + + +@pytest.fixture(scope="session") +def create_engines(): + # Indirectly import dependencies. To avoid being picked up by depdency scanning software. + sqlalchemy = pytest.importorskip("sqlalchemy") + pymysql = pytest.importorskip("pymysql") + + # Round robin creation of DB connections. + create_engine_commands = [ + lambda : sqlalchemy.create_engine("mysql+pymysql://root@localhost:3306/pandas", connect_args={"client_flag": pymysql.constants.CLIENT.MULTI_STATEMENTS}, poolclass=sqlalchemy.pool.NullPool), + lambda : sqlalchemy.create_engine("postgresql+psycopg2://postgres:postgres@localhost:5432/pandas", poolclass=sqlalchemy.pool.NullPool, isolation_level="AUTOCOMMIT") + ] + return create_engine_commands + + +@pytest.fixture(scope="session") +def round_robin_ordering(worker_number): + round_robin_order = [(worker_number+i)%len(create_engine_commands) for i in range(len(create_engine_commands))] + + +@pytest.fixture(scope="session") +def worker_number(worker_name): + if worker_name == 'master': + worker_number = 1 + else: + worker_number = int(worker_name[2:]) + return worker_number + + +@pytest.fixture(scope="session") +def create_db_string(): + return [ + f"""CREATE DATABASE IF NOT EXISTS pandas{worker_name}""", + f"""CREATE DATABASE pandas{worker_name}""" + ] + + +@pytest.fixture(scope="session") +def execute_db_command(): + for i in range(len(create_engine_commands)): + engine=create_engines()[round_robin_order()[i]]() + connection = engine.connect() + connection.execute(sqlalchemy.text(create_db_string())) + + +@pytest.fixture(scope="session", autouse=True) +def prepare_db_setup(request, worker_name): + worker_number = worker_number + create_engine_commands = create_engines() + create_db_command = create_db_string() + assert len(create_engine_commands) == len(create_db_command) + + round_robin_order = round_robin_ordering() + + for i in range(len(create_engine_commands)): + engine = create_engine_commands[round_robin_order[i]]() + connection = engine.connect() + connection.execute(sqlalchemy.text(create_db_string[round_robin_order[i]])) + engine.dispose() + yield + teardown_db_string = [ + f"""DROP DATABASE IF EXISTS pandas{worker_name}""", + f"""DROP DATABASE IF EXISTS pandas{worker_name}""" + ] + + for i in range(len(create_engine_commands)): + engine = create_engine_commands[round_robin_order[i]]() + connection = engine.connect() + connection.execute(sqlalchemy.text(teardown_db_string[round_robin_order[i]])) + engine.dispose() + + + + +# @pytest.fixture(scope="session") +# def parallelize_mysql(): +# sqlalchemy = pytest.importorskip("sqlalchemy") +# pymysql = pytest.importorskip("pymysql") +# +# engine = sqlalchemy.create_engine( +# connection_string, +# connect_args={"client_flag": pymysql.constants.CLIENT.MULTI_STATEMENTS}, +# poolclass=sqlalchemy.pool.NullPool, +# ) +# with engine.connect() as connection: +# connection.execute(sqlalchemy.text( +# f""" +# CREATE DATABASE IF NOT EXISTS pandas{worker_name}; +# """ +# )) +# # connection.commit() +# # connection.close() +# yield +# engine.dispose() +# +# pass + + + + +# @pytest.fixture(scope="session", autouse=True) +# def set_up_dbs(parallelize_mysql_dbs, request): +# if hasattr(request.config, "workerinput"): +# # The tests are multi-threaded +# worker_name = xdist.get_xdist_worker_id(request) +# worker_count = request.config.workerinput["workercount"] +# print(worker_name, worker_count) +# parallelize_mysql_dbs(request, worker_name, worker_count) +# else: +# quit(1) + # parallelize_mysql_dbs + + + + mysql_connectable = [ pytest.param("mysql_pymysql_engine", marks=pytest.mark.db), pytest.param("mysql_pymysql_conn", marks=pytest.mark.db), @@ -978,8 +1104,11 @@ def sqlite_buildin_types(sqlite_buildin, types_data): sqlalchemy_connectable_types + ["sqlite_buildin_types"] + adbc_connectable_types ) - -@pytest.mark.parametrize("conn", all_connectable) +#TODO fix +@pytest.mark.parametrize("conn", [ + #pytest.param("mysql_pymysql_engine", marks=pytest.mark.db), + pytest.param("mysql_pymysql_conn", marks=pytest.mark.db), +]) def test_dataframe_to_sql(conn, test_frame1, request): # GH 51086 if conn is sqlite_engine conn = request.getfixturevalue(conn) From 297c91231a361bed13cc1b595f01adf02d6b6ff2 Mon Sep 17 00:00:00 2001 From: Danil Shcherbakov Date: Tue, 3 Jun 2025 08:12:20 +0100 Subject: [PATCH 2/3] Setup all proper DB config, ready for PR --- pandas/tests/io/test_sql.py | 207 +++++++++++++++++++++--------------- 1 file changed, 120 insertions(+), 87 deletions(-) diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index ff94bf7ea0059..54533364d9bd9 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -898,9 +898,9 @@ def sqlite_buildin_types(sqlite_buildin, types_data): @pytest.fixture(scope="session") def worker_name(request): """ - Creates a unique schema name for Postgres to use, in order to + Returns a unique name per worker, in order to isolate tests for parallelization. - :return: Name to use for creating an isolated schema + :return: Name to use for creating/accessing an isolated SQL database :rtype: str """ return xdist.get_xdist_worker_id(request) @@ -908,11 +908,20 @@ def worker_name(request): @pytest.fixture(scope="session") def create_engines(): - # Indirectly import dependencies. To avoid being picked up by depdency scanning software. + """ + Fixture factory. Returns a list of lambda functions. + :return: create_engine_commands, a list of lambda functions that build an SQLAlchemy engine + :rtype: list[function, function] + + :mockup: + create_engine_commands = [ + MySQL, + Postgres, + ] + """ + # Indirectly import dependencies. To avoid being picked up by dependency scanning software. sqlalchemy = pytest.importorskip("sqlalchemy") pymysql = pytest.importorskip("pymysql") - - # Round robin creation of DB connections. create_engine_commands = [ lambda : sqlalchemy.create_engine("mysql+pymysql://root@localhost:3306/pandas", connect_args={"client_flag": pymysql.constants.CLIENT.MULTI_STATEMENTS}, poolclass=sqlalchemy.pool.NullPool), lambda : sqlalchemy.create_engine("postgresql+psycopg2://postgres:postgres@localhost:5432/pandas", poolclass=sqlalchemy.pool.NullPool, isolation_level="AUTOCOMMIT") @@ -921,12 +930,78 @@ def create_engines(): @pytest.fixture(scope="session") -def round_robin_ordering(worker_number): - round_robin_order = [(worker_number+i)%len(create_engine_commands) for i in range(len(create_engine_commands))] +def build_db_string(worker_name): + """ + Returns a list of queries used per SQL offering (Postgres, MySQL) to create per-worker DBs. + :return: build_db_string_query + :rtype: list[str, str] + + + :mockup: + build_db_string_query = [ + MySQL, + Postgres, + ] + """ + build_db_string_query = [ + f"""CREATE DATABASE IF NOT EXISTS pandas{worker_name}""", + f"""CREATE DATABASE pandas{worker_name}""", + ] + return build_db_string_query + + +@pytest.fixture(scope="session") +def teardown_db_string(worker_name): + """ + Returns a list of queries used per SQL offering (Postgres, MySQL) to teardown per-worker DBs. + :return: teardown_db_string_query + :rtype: list[str, str] + + + :mockup: + teardown_db_string_query = [ + MySQL, + Postgres, + ] + """ + teardown_db_string_query = [ + f"""DROP DATABASE pandas{worker_name}""", + f"""DROP DATABASE pandas{worker_name}""", + ] + return teardown_db_string_query + + +@pytest.fixture(scope="session") +def number_of_connections(create_engines, build_db_string, teardown_db_string): + """ + Asserts that there's parity between the number of strings and functions needed to create DBs. + Used for round-robin scheduling of DB initialization and teardown. + :return: len(build_db_string) + :rtype: int + """ + assert len(create_engines) == len(build_db_string) == len(teardown_db_string) + return len(build_db_string) + + +@pytest.fixture(scope="session") +def round_robin_order(worker_number, number_of_connections): + """ + Round-robin ordering of threads to initialize their own DB, equalizing connectivitiy burden between each SQL engine. + :return: rr_order, a modular ring, e.g. with 2 DBs, w1 gets [1,0], w2 gets [0,1], w3 gets [1,0], etc. + :rtype: list[int]*number_of_connections + """ + rr_order = [(worker_number+i) % number_of_connections for i in range(number_of_connections)] + return rr_order @pytest.fixture(scope="session") def worker_number(worker_name): + """ + Casts worker_name to an integer, making sure that with only one thread, or without xdist, DB connections are + still made correctly. + :return: worker_number, integer portion of worker_name, `1` if master. + :rtype: int + """ if worker_name == 'master': worker_number = 1 else: @@ -935,87 +1010,48 @@ def worker_number(worker_name): @pytest.fixture(scope="session") -def create_db_string(): - return [ - f"""CREATE DATABASE IF NOT EXISTS pandas{worker_name}""", - f"""CREATE DATABASE pandas{worker_name}""" - ] +def orphan_db_wrapper(request): + def take_care_of_orphan_dbs(create_engines, round_robin_order, teardown_db_string): + """ + Gets each thread's round-robin order, and connects to the appropriate SQL engine with the + appropriate teardown query string. + :return: None + """ + sqlalchemy = pytest.importorskip("sqlalchemy") + for rr_order in round_robin_order: + engine = create_engines[rr_order]() + with engine.connect() as conn: + conn.execute(sqlalchemy.text(teardown_db_string[rr_order])) + engine.dispose() + request.add_finalizer(take_care_of_orphan_dbs) + @pytest.fixture(scope="session") -def execute_db_command(): - for i in range(len(create_engine_commands)): - engine=create_engines()[round_robin_order()[i]]() - connection = engine.connect() - connection.execute(sqlalchemy.text(create_db_string())) +def build_and_teardown_dbs(create_engines, round_robin_order, build_db_string, teardown_db_string): + """ + Gets each thread's round-robin order, and connects to the appropriate SQL engine with the + appropriate build db query string. + :return: None + """ + sqlalchemy = pytest.importorskip("sqlalchemy") + for rr_order in round_robin_order: + engine = create_engines[rr_order]() + with engine.connect() as conn: + conn.execute(sqlalchemy.text(build_db_string[rr_order])) + engine.dispose() + yield + # Teardown DBs + for rr_order in round_robin_order: + engine = create_engines[rr_order]() + with engine.connect() as conn: + conn.execute(sqlalchemy.text(teardown_db_string[rr_order])) + engine.dispose() @pytest.fixture(scope="session", autouse=True) -def prepare_db_setup(request, worker_name): - worker_number = worker_number - create_engine_commands = create_engines() - create_db_command = create_db_string() - assert len(create_engine_commands) == len(create_db_command) - - round_robin_order = round_robin_ordering() - - for i in range(len(create_engine_commands)): - engine = create_engine_commands[round_robin_order[i]]() - connection = engine.connect() - connection.execute(sqlalchemy.text(create_db_string[round_robin_order[i]])) - engine.dispose() - yield - teardown_db_string = [ - f"""DROP DATABASE IF EXISTS pandas{worker_name}""", - f"""DROP DATABASE IF EXISTS pandas{worker_name}""" - ] - - for i in range(len(create_engine_commands)): - engine = create_engine_commands[round_robin_order[i]]() - connection = engine.connect() - connection.execute(sqlalchemy.text(teardown_db_string[round_robin_order[i]])) - engine.dispose() - - - - -# @pytest.fixture(scope="session") -# def parallelize_mysql(): -# sqlalchemy = pytest.importorskip("sqlalchemy") -# pymysql = pytest.importorskip("pymysql") -# -# engine = sqlalchemy.create_engine( -# connection_string, -# connect_args={"client_flag": pymysql.constants.CLIENT.MULTI_STATEMENTS}, -# poolclass=sqlalchemy.pool.NullPool, -# ) -# with engine.connect() as connection: -# connection.execute(sqlalchemy.text( -# f""" -# CREATE DATABASE IF NOT EXISTS pandas{worker_name}; -# """ -# )) -# # connection.commit() -# # connection.close() -# yield -# engine.dispose() -# -# pass - - - - -# @pytest.fixture(scope="session", autouse=True) -# def set_up_dbs(parallelize_mysql_dbs, request): -# if hasattr(request.config, "workerinput"): -# # The tests are multi-threaded -# worker_name = xdist.get_xdist_worker_id(request) -# worker_count = request.config.workerinput["workercount"] -# print(worker_name, worker_count) -# parallelize_mysql_dbs(request, worker_name, worker_count) -# else: -# quit(1) - # parallelize_mysql_dbs +def execution_point(build_and_teardown_dbs): + yield @@ -1104,11 +1140,8 @@ def prepare_db_setup(request, worker_name): sqlalchemy_connectable_types + ["sqlite_buildin_types"] + adbc_connectable_types ) -#TODO fix -@pytest.mark.parametrize("conn", [ - #pytest.param("mysql_pymysql_engine", marks=pytest.mark.db), - pytest.param("mysql_pymysql_conn", marks=pytest.mark.db), -]) + +@pytest.mark.parametrize("conn", all_connectable) def test_dataframe_to_sql(conn, test_frame1, request): # GH 51086 if conn is sqlite_engine conn = request.getfixturevalue(conn) From 6da55aaed9d91ed676fc2b14ffa417750d55f41c Mon Sep 17 00:00:00 2001 From: Danil Shcherbakov Date: Wed, 4 Jun 2025 06:47:39 +0100 Subject: [PATCH 3/3] Parallelize test_sql.py - minor change to xdist import statement --- pandas/tests/io/test_sql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index 54533364d9bd9..b0f0c2da55636 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -17,7 +17,6 @@ import numpy as np import pytest -import xdist from pandas._config import using_string_dtype @@ -903,6 +902,7 @@ def worker_name(request): :return: Name to use for creating/accessing an isolated SQL database :rtype: str """ + xdist = pytest.importorskip("xdist") return xdist.get_xdist_worker_id(request)