From 93ae6fbf866df0a4d289769dce62fea192defc39 Mon Sep 17 00:00:00 2001 From: Mathias Fussenegger Date: Mon, 4 Aug 2025 18:13:27 +0200 Subject: [PATCH] Extend rolling upgrade tests to cover insert with primary on new version --- tests/bwc/test_rolling_upgrade.py | 45 ++++++++++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/tests/bwc/test_rolling_upgrade.py b/tests/bwc/test_rolling_upgrade.py index 53fa50e9..6e9fff5d 100644 --- a/tests/bwc/test_rolling_upgrade.py +++ b/tests/bwc/test_rolling_upgrade.py @@ -1,8 +1,9 @@ import unittest from crate.client import connect from crate.client.exceptions import ProgrammingError +from cr8.run_crate import parse_version -from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath +from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath, assert_busy ROLLING_UPGRADES_V4 = ( # 4.0.0 -> 4.0.1 -> 4.0.2 don't support rolling upgrades due to a bug @@ -255,6 +256,48 @@ def _test_rolling_upgrade(self, path, nodes): # Add the shards of the new partition primaries expected_active_shards += shards + + # Ensure inserts also work with the primary on the newer node + c.execute("alter table t1 set (number_of_replicas = 0)") + c.execute(""" + select + node['id'], + (select version['number'] from sys.nodes where id = node['id']) as version + from + sys.shards + where + table_name = 't1' + and id = 0 + and primary = true + and state = 'STARTED' + """ + ) + primary_node_id, primary_version = c.fetchone() + c.execute( + "select id from sys.nodes where id != ? and version['number'] != ? limit 1", + [primary_node_id, primary_version] + ) + node_ids = list(c.fetchall()) + if node_ids and parse_version(primary_version) < new_node.version: + alt_node_id = node_ids[0][0] + c.execute("ALTER TABLE t1 REROUTE MOVE SHARD 0 FROM ? TO ?", [primary_node_id, alt_node_id]) + c.execute("alter table t1 set (number_of_replicas = ?)", [replicas]) + # insert a few records to ensure shard 0 is used + c.execute("INSERT INTO doc.t1 (value) VALUES (?)", bulk_parameters=[[42]] * 10) + else: + # this was the last node upgraded to the new version, no node with old version left + c.execute("alter table t1 set (number_of_replicas = ?)", [replicas]) + + def check_health(): + c.execute("select health, table_name, underreplicated_shards from sys.health") + health_result = list(c.fetchall()) + for health, table_name, underreplicated in health_result: + self.assertEqual(health, "GREEN", f"{table_name} health must be green") + self.assertEqual(underreplicated, 0, f"{table_name} must not have any underreplicated shards") + + assert_busy(check_health, timeout=20) + + # Ensure table/partition versions created are correct if int(path.from_version.split('.')[0]) >= 5: c.execute("insert into doc.t2(a, b) values (?, ?)", [idx, idx])