diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index ff06d04fc23bd..b0f0c2da55636 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -601,11 +601,11 @@ def drop_view( @pytest.fixture -def mysql_pymysql_engine(): +def mysql_pymysql_engine(worker_name): sqlalchemy = pytest.importorskip("sqlalchemy") pymysql = pytest.importorskip("pymysql") engine = sqlalchemy.create_engine( - "mysql+pymysql://root@localhost:3306/pandas", + f"mysql+pymysql://root@localhost:3306/pandas{worker_name}", connect_args={"client_flag": pymysql.constants.CLIENT.MULTI_STATEMENTS}, poolclass=sqlalchemy.pool.NullPool, ) @@ -649,11 +649,11 @@ def mysql_pymysql_conn_types(mysql_pymysql_engine_types): @pytest.fixture -def postgresql_psycopg2_engine(): +def postgresql_psycopg2_engine(worker_name): sqlalchemy = pytest.importorskip("sqlalchemy") pytest.importorskip("psycopg2") engine = sqlalchemy.create_engine( - "postgresql+psycopg2://postgres:postgres@localhost:5432/pandas", + f"postgresql+psycopg2://postgres:postgres@localhost:5432/pandas{worker_name}", poolclass=sqlalchemy.pool.NullPool, ) yield engine @@ -684,12 +684,12 @@ def postgresql_psycopg2_conn(postgresql_psycopg2_engine): @pytest.fixture -def postgresql_adbc_conn(): +def postgresql_adbc_conn(worker_name): pytest.importorskip("pyarrow") pytest.importorskip("adbc_driver_postgresql") from adbc_driver_postgresql import dbapi - uri = "postgresql://postgres:postgres@localhost:5432/pandas" + uri = f"postgresql://postgres:postgres@localhost:5432/pandas{worker_name}" with dbapi.connect(uri) as conn: yield conn for view in get_all_views(conn): @@ -748,10 +748,10 @@ def postgresql_psycopg2_conn_types(postgresql_psycopg2_engine_types): @pytest.fixture -def sqlite_str(): +def sqlite_str(worker_name): pytest.importorskip("sqlalchemy") with tm.ensure_clean() as name: - yield f"sqlite:///{name}" + yield f"sqlite:///{name}{worker_name}" @pytest.fixture @@ -816,14 +816,14 @@ def sqlite_conn_types(sqlite_engine_types): yield conn -@pytest.fixture -def sqlite_adbc_conn(): +@pytest.fixture(scope="function") +def sqlite_adbc_conn(worker_name): pytest.importorskip("pyarrow") pytest.importorskip("adbc_driver_sqlite") from adbc_driver_sqlite import dbapi with tm.ensure_clean() as name: - uri = f"file:{name}" + uri = f"file:{name}{worker_name}" with dbapi.connect(uri) as conn: yield conn for view in get_all_views(conn): @@ -894,6 +894,168 @@ def sqlite_buildin_types(sqlite_buildin, types_data): return sqlite_buildin +@pytest.fixture(scope="session") +def worker_name(request): + """ + Returns a unique name per worker, in order to + isolate tests for parallelization. + :return: Name to use for creating/accessing an isolated SQL database + :rtype: str + """ + xdist = pytest.importorskip("xdist") + return xdist.get_xdist_worker_id(request) + + +@pytest.fixture(scope="session") +def create_engines(): + """ + Fixture factory. Returns a list of lambda functions. + :return: create_engine_commands, a list of lambda functions that build an SQLAlchemy engine + :rtype: list[function, function] + + :mockup: + create_engine_commands = [ + MySQL, + Postgres, + ] + """ + # Indirectly import dependencies. To avoid being picked up by dependency scanning software. + sqlalchemy = pytest.importorskip("sqlalchemy") + pymysql = pytest.importorskip("pymysql") + create_engine_commands = [ + lambda : sqlalchemy.create_engine("mysql+pymysql://root@localhost:3306/pandas", connect_args={"client_flag": pymysql.constants.CLIENT.MULTI_STATEMENTS}, poolclass=sqlalchemy.pool.NullPool), + lambda : sqlalchemy.create_engine("postgresql+psycopg2://postgres:postgres@localhost:5432/pandas", poolclass=sqlalchemy.pool.NullPool, isolation_level="AUTOCOMMIT") + ] + return create_engine_commands + + +@pytest.fixture(scope="session") +def build_db_string(worker_name): + """ + Returns a list of queries used per SQL offering (Postgres, MySQL) to create per-worker DBs. + :return: build_db_string_query + :rtype: list[str, str] + + + :mockup: + build_db_string_query = [ + MySQL, + Postgres, + ] + """ + build_db_string_query = [ + f"""CREATE DATABASE IF NOT EXISTS pandas{worker_name}""", + f"""CREATE DATABASE pandas{worker_name}""", + ] + return build_db_string_query + + +@pytest.fixture(scope="session") +def teardown_db_string(worker_name): + """ + Returns a list of queries used per SQL offering (Postgres, MySQL) to teardown per-worker DBs. + :return: teardown_db_string_query + :rtype: list[str, str] + + + :mockup: + teardown_db_string_query = [ + MySQL, + Postgres, + ] + """ + teardown_db_string_query = [ + f"""DROP DATABASE pandas{worker_name}""", + f"""DROP DATABASE pandas{worker_name}""", + ] + return teardown_db_string_query + + +@pytest.fixture(scope="session") +def number_of_connections(create_engines, build_db_string, teardown_db_string): + """ + Asserts that there's parity between the number of strings and functions needed to create DBs. + Used for round-robin scheduling of DB initialization and teardown. + :return: len(build_db_string) + :rtype: int + """ + assert len(create_engines) == len(build_db_string) == len(teardown_db_string) + return len(build_db_string) + + +@pytest.fixture(scope="session") +def round_robin_order(worker_number, number_of_connections): + """ + Round-robin ordering of threads to initialize their own DB, equalizing connectivitiy burden between each SQL engine. + :return: rr_order, a modular ring, e.g. with 2 DBs, w1 gets [1,0], w2 gets [0,1], w3 gets [1,0], etc. + :rtype: list[int]*number_of_connections + """ + rr_order = [(worker_number+i) % number_of_connections for i in range(number_of_connections)] + return rr_order + + +@pytest.fixture(scope="session") +def worker_number(worker_name): + """ + Casts worker_name to an integer, making sure that with only one thread, or without xdist, DB connections are + still made correctly. + :return: worker_number, integer portion of worker_name, `1` if master. + :rtype: int + """ + if worker_name == 'master': + worker_number = 1 + else: + worker_number = int(worker_name[2:]) + return worker_number + + +@pytest.fixture(scope="session") +def orphan_db_wrapper(request): + def take_care_of_orphan_dbs(create_engines, round_robin_order, teardown_db_string): + """ + Gets each thread's round-robin order, and connects to the appropriate SQL engine with the + appropriate teardown query string. + :return: None + """ + sqlalchemy = pytest.importorskip("sqlalchemy") + for rr_order in round_robin_order: + engine = create_engines[rr_order]() + with engine.connect() as conn: + conn.execute(sqlalchemy.text(teardown_db_string[rr_order])) + engine.dispose() + request.add_finalizer(take_care_of_orphan_dbs) + + + +@pytest.fixture(scope="session") +def build_and_teardown_dbs(create_engines, round_robin_order, build_db_string, teardown_db_string): + """ + Gets each thread's round-robin order, and connects to the appropriate SQL engine with the + appropriate build db query string. + :return: None + """ + sqlalchemy = pytest.importorskip("sqlalchemy") + for rr_order in round_robin_order: + engine = create_engines[rr_order]() + with engine.connect() as conn: + conn.execute(sqlalchemy.text(build_db_string[rr_order])) + engine.dispose() + yield + # Teardown DBs + for rr_order in round_robin_order: + engine = create_engines[rr_order]() + with engine.connect() as conn: + conn.execute(sqlalchemy.text(teardown_db_string[rr_order])) + engine.dispose() + + +@pytest.fixture(scope="session", autouse=True) +def execution_point(build_and_teardown_dbs): + yield + + + + mysql_connectable = [ pytest.param("mysql_pymysql_engine", marks=pytest.mark.db), pytest.param("mysql_pymysql_conn", marks=pytest.mark.db),