Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion tests/bwc/test_rolling_upgrade.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import unittest
from crate.client import connect
from crate.client.exceptions import ProgrammingError
from cr8.run_crate import parse_version

from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath
from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath, assert_busy

ROLLING_UPGRADES_V4 = (
# 4.0.0 -> 4.0.1 -> 4.0.2 don't support rolling upgrades due to a bug
Expand Down Expand Up @@ -255,6 +256,48 @@ def _test_rolling_upgrade(self, path, nodes):
# Add the shards of the new partition primaries
expected_active_shards += shards


# Ensure inserts also work with the primary on the newer node
c.execute("alter table t1 set (number_of_replicas = 0)")
c.execute("""
select
node['id'],
(select version['number'] from sys.nodes where id = node['id']) as version
from
sys.shards
where
table_name = 't1'
and id = 0
and primary = true
and state = 'STARTED'
"""
)
primary_node_id, primary_version = c.fetchone()
c.execute(
"select id from sys.nodes where id != ? and version['number'] != ? limit 1",
[primary_node_id, primary_version]
)
node_ids = list(c.fetchall())
if node_ids and parse_version(primary_version) < new_node.version:
alt_node_id = node_ids[0][0]
c.execute("ALTER TABLE t1 REROUTE MOVE SHARD 0 FROM ? TO ?", [primary_node_id, alt_node_id])
c.execute("alter table t1 set (number_of_replicas = ?)", [replicas])
# insert a few records to ensure shard 0 is used
c.execute("INSERT INTO doc.t1 (value) VALUES (?)", bulk_parameters=[[42]] * 10)
else:
# this was the last node upgraded to the new version, no node with old version left
c.execute("alter table t1 set (number_of_replicas = ?)", [replicas])

def check_health():
c.execute("select health, table_name, underreplicated_shards from sys.health")
health_result = list(c.fetchall())
for health, table_name, underreplicated in health_result:
self.assertEqual(health, "GREEN", f"{table_name} health must be green")
self.assertEqual(underreplicated, 0, f"{table_name} must not have any underreplicated shards")

assert_busy(check_health, timeout=20)


# Ensure table/partition versions created are correct
if int(path.from_version.split('.')[0]) >= 5:
c.execute("insert into doc.t2(a, b) values (?, ?)", [idx, idx])
Expand Down