Skip to content

Commit e8d59e4

Browse files
committed
feat: srw
1 parent 658ee07 commit e8d59e4

File tree

4 files changed

+789
-766
lines changed

4 files changed

+789
-766
lines changed

tests/integration/container/test_read_write_splitting.py

Lines changed: 140 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import gc
16+
from typing import ClassVar
1617

1718
import pytest
1819
from sqlalchemy import PoolProxiedConnection
@@ -26,6 +27,7 @@
2627
from aws_advanced_python_wrapper.host_list_provider import RdsHostListProvider
2728
from aws_advanced_python_wrapper.sql_alchemy_connection_provider import \
2829
SqlAlchemyPooledConnectionProvider
30+
from aws_advanced_python_wrapper.utils.log import Logger
2931
from aws_advanced_python_wrapper.utils.properties import (Properties,
3032
WrapperProperties)
3133
from tests.integration.container.utils.conditions import (
@@ -51,6 +53,16 @@
5153
TestEnvironmentFeatures.BLUE_GREEN_DEPLOYMENT,
5254
TestEnvironmentFeatures.PERFORMANCE])
5355
class TestReadWriteSplitting:
56+
logger: ClassVar[Logger] = Logger(__name__)
57+
58+
# Plugin configurations
59+
@pytest.fixture(params=[
60+
("read_write_splitting", "read_write_splitting"),
61+
("srw", "srw")
62+
])
63+
def plugin_config(self, request):
64+
return request.param
65+
5466
@pytest.fixture(scope='class')
5567
def rds_utils(self):
5668
region: str = TestEnvironment.get_current().get_info().get_region()
@@ -62,9 +74,15 @@ def clear_caches(self):
6274
RdsHostListProvider._is_primary_cluster_id_cache.clear()
6375
RdsHostListProvider._cluster_ids_to_update.clear()
6476

65-
@pytest.fixture(scope='class')
66-
def props(self):
67-
p: Properties = Properties({"plugins": "read_write_splitting", "connect_timeout": 30, "autocommit": True})
77+
@pytest.fixture
78+
def props(self, plugin_config, conn_utils):
79+
plugin_name, plugin_value = plugin_config
80+
p: Properties = Properties({"plugins": plugin_value, "connect_timeout": 30, "autocommit": True})
81+
82+
# Add simple plugin specific configuration
83+
if plugin_name == "srw":
84+
WrapperProperties.SRW_WRITE_ENDPOINT.set(p, conn_utils.writer_cluster_host)
85+
WrapperProperties.SRW_READ_ENDPOINT.set(p, conn_utils.reader_cluster_host)
6886

6987
if TestEnvironmentFeatures.TELEMETRY_TRACES_ENABLED in TestEnvironment.get_current().get_features() \
7088
or TestEnvironmentFeatures.TELEMETRY_METRICS_ENABLED in TestEnvironment.get_current().get_features():
@@ -79,19 +97,26 @@ def props(self):
7997

8098
return p
8199

82-
@pytest.fixture(scope='class')
83-
def failover_props(self):
84-
return {
85-
"plugins": "read_write_splitting,failover", "connect_timeout": 10, "autocommit": True}
86-
87-
@pytest.fixture(scope='class')
100+
@pytest.fixture
101+
def failover_props(self, plugin_config, conn_utils):
102+
plugin_name, plugin_value = plugin_config
103+
props = {"plugins": f"{plugin_value},failover", "connect_timeout": 10, "autocommit": True}
104+
105+
# Add simple plugin specific configuration
106+
if plugin_name == "srw":
107+
WrapperProperties.SRW_WRITE_ENDPOINT.set(props, conn_utils.writer_cluster_host)
108+
WrapperProperties.SRW_READ_ENDPOINT.set(props, conn_utils.reader_cluster_host)
109+
110+
return props
111+
112+
@pytest.fixture
88113
def proxied_props(self, props, conn_utils):
89114
props_copy = props.copy()
90115
endpoint_suffix = TestEnvironment.get_current().get_proxy_database_info().get_instance_endpoint_suffix()
91116
WrapperProperties.CLUSTER_INSTANCE_HOST_PATTERN.set(props_copy, f"?.{endpoint_suffix}:{conn_utils.proxy_port}")
92117
return props_copy
93118

94-
@pytest.fixture(scope='class')
119+
@pytest.fixture
95120
def proxied_failover_props(self, failover_props, conn_utils):
96121
props_copy = failover_props.copy()
97122
endpoint_suffix = TestEnvironment.get_current().get_proxy_database_info().get_instance_endpoint_suffix()
@@ -133,7 +158,10 @@ def test_connect_to_writer__switch_read_only(
133158
assert reader_id == current_id
134159

135160
def test_connect_to_reader__switch_read_only(
136-
self, test_environment: TestEnvironment, test_driver: TestDriver, props, conn_utils, rds_utils):
161+
self, test_environment: TestEnvironment, test_driver: TestDriver, props, conn_utils, rds_utils, plugin_config):
162+
plugin_name, _ = plugin_config
163+
if plugin_name != "read_write_splitting":
164+
pytest.skip("Test only applies to read_write_splitting plugin: srw does not connect to instances")
137165
target_driver_connect = DriverHelper.get_connect_func(test_driver)
138166
reader_instance = test_environment.get_instances()[1]
139167
with AwsWrapperConnection.connect(
@@ -239,7 +267,13 @@ def test_set_read_only_true_in_transaction(
239267
@enable_on_features([TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED])
240268
@enable_on_num_instances(min_instances=3)
241269
def test_set_read_only_true__all_readers_down(
242-
self, test_environment: TestEnvironment, test_driver: TestDriver, proxied_props, conn_utils, rds_utils):
270+
self, test_environment: TestEnvironment, test_driver: TestDriver,
271+
proxied_props, conn_utils, rds_utils, plugin_config):
272+
plugin_name, _ = plugin_config
273+
if plugin_name != "read_write_splitting":
274+
#pytest.skip("Test only applies to read_write_splitting plugin")
275+
self.logger.debug(f"flagged as skippable.")
276+
243277
target_driver_connect = DriverHelper.get_connect_func(test_driver)
244278
connect_params = conn_utils.get_proxy_connect_params()
245279

@@ -313,7 +347,12 @@ def test_execute__old_connection(
313347
@enable_on_num_instances(min_instances=3)
314348
def test_failover_to_new_writer__switch_read_only(
315349
self, test_environment: TestEnvironment, test_driver: TestDriver,
316-
proxied_failover_props, conn_utils, rds_utils):
350+
proxied_failover_props, conn_utils, rds_utils, plugin_config):
351+
plugin_name, _ = plugin_config
352+
if plugin_name != "read_write_splitting":
353+
# pytest.skip("Test only applies to read_write_splitting plugin")
354+
self.logger.debug(f"flagged as skippable.")
355+
317356
target_driver_connect = DriverHelper.get_connect_func(test_driver)
318357
connect_params = conn_utils.get_proxy_connect_params()
319358

@@ -356,7 +395,12 @@ def test_failover_to_new_writer__switch_read_only(
356395
@disable_on_engines([DatabaseEngine.MYSQL])
357396
def test_failover_to_new_reader__switch_read_only(
358397
self, test_environment: TestEnvironment, test_driver: TestDriver,
359-
proxied_failover_props, conn_utils, rds_utils, plugins):
398+
proxied_failover_props, conn_utils, rds_utils, plugins, plugin_config):
399+
plugin_name, _ = plugin_config
400+
if plugin_name != "read_write_splitting":
401+
# Disabling the reader connection in srw, the srwReadEndpoint, results in defaulting to the writer not connecting to another reader.
402+
pytest.skip("Test only applies to read_write_splitting plugin: reader connection failover")
403+
360404
WrapperProperties.PLUGINS.set(proxied_failover_props, plugins)
361405
WrapperProperties.FAILOVER_MODE.set(proxied_failover_props, "reader-or-writer")
362406

@@ -406,7 +450,12 @@ def test_failover_to_new_reader__switch_read_only(
406450
@disable_on_engines([DatabaseEngine.MYSQL])
407451
def test_failover_reader_to_writer__switch_read_only(
408452
self, test_environment: TestEnvironment, test_driver: TestDriver,
409-
proxied_failover_props, conn_utils, rds_utils, plugins):
453+
proxied_failover_props, conn_utils, rds_utils, plugins, plugin_config):
454+
plugin_name, _ = plugin_config
455+
if plugin_name != "read_write_splitting":
456+
# pytest.skip("Test only applies to read_write_splitting plugin")
457+
self.logger.debug(f"flagged as skippable.")
458+
410459
WrapperProperties.PLUGINS.set(proxied_failover_props, plugins)
411460
target_driver_connect = DriverHelper.get_connect_func(test_driver)
412461
with AwsWrapperConnection.connect(
@@ -526,7 +575,12 @@ def test_pooled_connection__cluster_url_failover(
526575
@disable_on_engines([DatabaseEngine.MYSQL])
527576
def test_pooled_connection__failover_failed(
528577
self, test_environment: TestEnvironment, test_driver: TestDriver,
529-
rds_utils, conn_utils, proxied_failover_props, plugins):
578+
rds_utils, conn_utils, proxied_failover_props, plugins, plugin_config):
579+
plugin_name, _ = plugin_config
580+
if plugin_name != "read_write_splitting":
581+
# pytest.skip("Test only applies to read_write_splitting plugin")
582+
self.logger.debug(f"flagged as skippable.")
583+
530584
writer_host = test_environment.get_writer().get_host()
531585
provider = SqlAlchemyPooledConnectionProvider(lambda _, __: {"pool_size": 1}, None, lambda host_info, props: writer_host in host_info.host)
532586
ConnectionProviderManager.set_connection_provider(provider)
@@ -651,7 +705,11 @@ def test_pooled_connection__different_users(
651705

652706
@enable_on_num_instances(min_instances=5)
653707
def test_pooled_connection__least_connections(
654-
self, test_environment: TestEnvironment, test_driver: TestDriver, rds_utils, conn_utils, props):
708+
self, test_environment: TestEnvironment, test_driver: TestDriver, rds_utils, conn_utils, props, plugin_config):
709+
plugin_name, _ = plugin_config
710+
if plugin_name != "read_write_splitting":
711+
pytest.skip("Test only applies to read_write_splitting plugin: reader host selector strategy")
712+
655713
WrapperProperties.READER_HOST_SELECTOR_STRATEGY.set(props, "least_connections")
656714

657715
instances = test_environment.get_instances()
@@ -675,6 +733,66 @@ def test_pooled_connection__least_connections(
675733
for conn in connections:
676734
conn.close()
677735

736+
def test_incorrect_reader_endpoint(
737+
self, test_environment: TestEnvironment, test_driver: TestDriver, conn_utils, rds_utils, plugin_config):
738+
plugin_name, plugin_value = plugin_config
739+
if plugin_name != "srw":
740+
pytest.skip("Test only applies to simple_read_write_splitting plugin: uses srwReadEndpoint property")
741+
742+
props = Properties({"plugins": plugin_value, "connect_timeout": 30, "autocommit": True})
743+
port = test_environment.get_info().get_database_info().get_cluster_endpoint_port()
744+
writer_endpoint = conn_utils.writer_cluster_host
745+
746+
# Set both endpoints to writer (incorrect reader endpoint)
747+
WrapperProperties.SRW_WRITE_ENDPOINT.set(props, f"{writer_endpoint}:{port}")
748+
WrapperProperties.SRW_READ_ENDPOINT.set(props, f"{writer_endpoint}:{port}")
749+
750+
target_driver_connect = DriverHelper.get_connect_func(test_driver)
751+
with AwsWrapperConnection.connect(
752+
target_driver_connect, **conn_utils.get_connect_params(conn_utils.writer_cluster_host), **props) as conn:
753+
writer_connection_id = rds_utils.query_instance_id(conn)
754+
755+
# Switch to reader successfully
756+
conn.read_only = True
757+
reader_connection_id = rds_utils.query_instance_id(conn)
758+
# Should stay on writer as fallback since reader endpoint points to a writer
759+
assert writer_connection_id == reader_connection_id
760+
761+
# Going to the write endpoint will be the same connection again
762+
conn.read_only = False
763+
final_connection_id = rds_utils.query_instance_id(conn)
764+
assert writer_connection_id == final_connection_id
765+
766+
def test_autocommit_state_preserved_across_connection_switches(
767+
self, test_environment: TestEnvironment, test_driver: TestDriver, props, conn_utils, rds_utils, plugin_config):
768+
plugin_name, _ = plugin_config
769+
if plugin_name != "srw":
770+
pytest.skip("Test only applies to simple_read_write_splitting plugin: autocommit impacts srw verification")
771+
772+
target_driver_connect = DriverHelper.get_connect_func(test_driver)
773+
with AwsWrapperConnection.connect(target_driver_connect, **conn_utils.get_connect_params(), **props) as conn:
774+
# Set autocommit to False on writer
775+
conn.autocommit = False
776+
assert conn.autocommit is False
777+
writer_connection_id = rds_utils.query_instance_id(conn)
778+
conn.commit()
779+
780+
# Switch to reader - autocommit should remain False
781+
conn.read_only = True
782+
assert conn.autocommit is False
783+
reader_connection_id = rds_utils.query_instance_id(conn)
784+
assert writer_connection_id != reader_connection_id
785+
786+
# Change autocommit on reader
787+
conn.autocommit = True
788+
assert conn.autocommit is True
789+
790+
# Switch back to writer - autocommit should be True
791+
conn.read_only = False
792+
assert conn.autocommit is True
793+
final_writer_connection_id = rds_utils.query_instance_id(conn)
794+
assert writer_connection_id == final_writer_connection_id
795+
678796
"""Tests custom pool mapping together with internal connection pools and the leastConnections
679797
host selection strategy. This test overloads one reader with connections and then verifies
680798
that new connections are sent to the other readers until their connection count equals that of
@@ -683,7 +801,11 @@ def test_pooled_connection__least_connections(
683801

684802
@enable_on_num_instances(min_instances=5)
685803
def test_pooled_connection__least_connections__pool_mapping(
686-
self, test_environment: TestEnvironment, test_driver: TestDriver, rds_utils, conn_utils, props):
804+
self, test_environment: TestEnvironment, test_driver: TestDriver, rds_utils, conn_utils, props, plugin_config):
805+
plugin_name, _ = plugin_config
806+
if plugin_name != "read_write_splitting":
807+
pytest.skip("Test only applies to read_write_splitting plugin: reader host selector strategy")
808+
687809
WrapperProperties.READER_HOST_SELECTOR_STRATEGY.set(props, "least_connections")
688810

689811
# We will be testing all instances excluding the writer and overloaded reader. Each instance

0 commit comments

Comments
 (0)