Skip to content

Parallelize test_sql.py - Issue #60378 #61551

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 173 additions & 11 deletions pandas/tests/io/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,11 +601,11 @@ def drop_view(


@pytest.fixture
def mysql_pymysql_engine():
def mysql_pymysql_engine(worker_name):
sqlalchemy = pytest.importorskip("sqlalchemy")
pymysql = pytest.importorskip("pymysql")
engine = sqlalchemy.create_engine(
"mysql+pymysql://root@localhost:3306/pandas",
f"mysql+pymysql://root@localhost:3306/pandas{worker_name}",
connect_args={"client_flag": pymysql.constants.CLIENT.MULTI_STATEMENTS},
poolclass=sqlalchemy.pool.NullPool,
)
Expand Down Expand Up @@ -649,11 +649,11 @@ def mysql_pymysql_conn_types(mysql_pymysql_engine_types):


@pytest.fixture
def postgresql_psycopg2_engine():
def postgresql_psycopg2_engine(worker_name):
sqlalchemy = pytest.importorskip("sqlalchemy")
pytest.importorskip("psycopg2")
engine = sqlalchemy.create_engine(
"postgresql+psycopg2://postgres:postgres@localhost:5432/pandas",
f"postgresql+psycopg2://postgres:postgres@localhost:5432/pandas{worker_name}",
poolclass=sqlalchemy.pool.NullPool,
)
yield engine
Expand Down Expand Up @@ -684,12 +684,12 @@ def postgresql_psycopg2_conn(postgresql_psycopg2_engine):


@pytest.fixture
def postgresql_adbc_conn():
def postgresql_adbc_conn(worker_name):
pytest.importorskip("pyarrow")
pytest.importorskip("adbc_driver_postgresql")
from adbc_driver_postgresql import dbapi

uri = "postgresql://postgres:postgres@localhost:5432/pandas"
uri = f"postgresql://postgres:postgres@localhost:5432/pandas{worker_name}"
with dbapi.connect(uri) as conn:
yield conn
for view in get_all_views(conn):
Expand Down Expand Up @@ -748,10 +748,10 @@ def postgresql_psycopg2_conn_types(postgresql_psycopg2_engine_types):


@pytest.fixture
def sqlite_str():
def sqlite_str(worker_name):
pytest.importorskip("sqlalchemy")
with tm.ensure_clean() as name:
yield f"sqlite:///{name}"
yield f"sqlite:///{name}{worker_name}"


@pytest.fixture
Expand Down Expand Up @@ -816,14 +816,14 @@ def sqlite_conn_types(sqlite_engine_types):
yield conn


@pytest.fixture
def sqlite_adbc_conn():
@pytest.fixture(scope="function")
def sqlite_adbc_conn(worker_name):
pytest.importorskip("pyarrow")
pytest.importorskip("adbc_driver_sqlite")
from adbc_driver_sqlite import dbapi

with tm.ensure_clean() as name:
uri = f"file:{name}"
uri = f"file:{name}{worker_name}"
with dbapi.connect(uri) as conn:
yield conn
for view in get_all_views(conn):
Expand Down Expand Up @@ -894,6 +894,168 @@ def sqlite_buildin_types(sqlite_buildin, types_data):
return sqlite_buildin


@pytest.fixture(scope="session")
def worker_name(request):
"""
Returns a unique name per worker, in order to
isolate tests for parallelization.
:return: Name to use for creating/accessing an isolated SQL database
:rtype: str
"""
xdist = pytest.importorskip("xdist")
return xdist.get_xdist_worker_id(request)


@pytest.fixture(scope="session")
def create_engines():
"""
Fixture factory. Returns a list of lambda functions.
:return: create_engine_commands, a list of lambda functions that build an SQLAlchemy engine
:rtype: list[function, function]

:mockup:
create_engine_commands = [
MySQL,
Postgres,
]
"""
# Indirectly import dependencies. To avoid being picked up by dependency scanning software.
sqlalchemy = pytest.importorskip("sqlalchemy")
pymysql = pytest.importorskip("pymysql")
create_engine_commands = [
lambda : sqlalchemy.create_engine("mysql+pymysql://root@localhost:3306/pandas", connect_args={"client_flag": pymysql.constants.CLIENT.MULTI_STATEMENTS}, poolclass=sqlalchemy.pool.NullPool),
lambda : sqlalchemy.create_engine("postgresql+psycopg2://postgres:postgres@localhost:5432/pandas", poolclass=sqlalchemy.pool.NullPool, isolation_level="AUTOCOMMIT")
]
return create_engine_commands


@pytest.fixture(scope="session")
def build_db_string(worker_name):
"""
Returns a list of queries used per SQL offering (Postgres, MySQL) to create per-worker DBs.
:return: build_db_string_query
:rtype: list[str, str]


:mockup:
build_db_string_query = [
MySQL,
Postgres,
]
"""
build_db_string_query = [
f"""CREATE DATABASE IF NOT EXISTS pandas{worker_name}""",
f"""CREATE DATABASE pandas{worker_name}""",
]
return build_db_string_query


@pytest.fixture(scope="session")
def teardown_db_string(worker_name):
"""
Returns a list of queries used per SQL offering (Postgres, MySQL) to teardown per-worker DBs.
:return: teardown_db_string_query
:rtype: list[str, str]


:mockup:
teardown_db_string_query = [
MySQL,
Postgres,
]
"""
teardown_db_string_query = [
f"""DROP DATABASE pandas{worker_name}""",
f"""DROP DATABASE pandas{worker_name}""",
]
return teardown_db_string_query


@pytest.fixture(scope="session")
def number_of_connections(create_engines, build_db_string, teardown_db_string):
"""
Asserts that there's parity between the number of strings and functions needed to create DBs.
Used for round-robin scheduling of DB initialization and teardown.
:return: len(build_db_string)
:rtype: int
"""
assert len(create_engines) == len(build_db_string) == len(teardown_db_string)
return len(build_db_string)


@pytest.fixture(scope="session")
def round_robin_order(worker_number, number_of_connections):
"""
Round-robin ordering of threads to initialize their own DB, equalizing connectivitiy burden between each SQL engine.
:return: rr_order, a modular ring, e.g. with 2 DBs, w1 gets [1,0], w2 gets [0,1], w3 gets [1,0], etc.
:rtype: list[int]*number_of_connections
"""
rr_order = [(worker_number+i) % number_of_connections for i in range(number_of_connections)]
return rr_order


@pytest.fixture(scope="session")
def worker_number(worker_name):
"""
Casts worker_name to an integer, making sure that with only one thread, or without xdist, DB connections are
still made correctly.
:return: worker_number, integer portion of worker_name, `1` if master.
:rtype: int
"""
if worker_name == 'master':
worker_number = 1
else:
worker_number = int(worker_name[2:])
return worker_number


@pytest.fixture(scope="session")
def orphan_db_wrapper(request):
def take_care_of_orphan_dbs(create_engines, round_robin_order, teardown_db_string):
"""
Gets each thread's round-robin order, and connects to the appropriate SQL engine with the
appropriate teardown query string.
:return: None
"""
sqlalchemy = pytest.importorskip("sqlalchemy")
for rr_order in round_robin_order:
engine = create_engines[rr_order]()
with engine.connect() as conn:
conn.execute(sqlalchemy.text(teardown_db_string[rr_order]))
engine.dispose()
request.add_finalizer(take_care_of_orphan_dbs)



@pytest.fixture(scope="session")
def build_and_teardown_dbs(create_engines, round_robin_order, build_db_string, teardown_db_string):
"""
Gets each thread's round-robin order, and connects to the appropriate SQL engine with the
appropriate build db query string.
:return: None
"""
sqlalchemy = pytest.importorskip("sqlalchemy")
for rr_order in round_robin_order:
engine = create_engines[rr_order]()
with engine.connect() as conn:
conn.execute(sqlalchemy.text(build_db_string[rr_order]))
engine.dispose()
yield
# Teardown DBs
for rr_order in round_robin_order:
engine = create_engines[rr_order]()
with engine.connect() as conn:
conn.execute(sqlalchemy.text(teardown_db_string[rr_order]))
engine.dispose()


@pytest.fixture(scope="session", autouse=True)
def execution_point(build_and_teardown_dbs):
yield




mysql_connectable = [
pytest.param("mysql_pymysql_engine", marks=pytest.mark.db),
pytest.param("mysql_pymysql_conn", marks=pytest.mark.db),
Expand Down
Loading