Skip to content

Commit 924d2e5

Browse files
authored
Removing tables from backup collection (#27068)
1 parent 719bc68 commit 924d2e5

File tree

1 file changed

+169
-0
lines changed

1 file changed

+169
-0
lines changed

ydb/tests/functional/backup_collection/basic_user_scenarios.py

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1514,3 +1514,172 @@ def q(role: str) -> str:
15141514
# cleanup exported data
15151515
if os.path.exists(export_dir):
15161516
shutil.rmtree(export_dir)
1517+
1518+
1519+
class TestIncrementalChainRestoreAfterDeletion(TestFullCycleLocalBackupRestore):
1520+
def _record_snapshot_and_rows(self, collection_src: str, t_orders: str, t_products: str,
1521+
created_snapshots: list, snapshot_rows: dict) -> str:
1522+
"""Record newest snapshot name and capture rows for orders/products."""
1523+
kids = sorted(self.get_collection_children(collection_src))
1524+
assert kids, "No snapshots found after backup"
1525+
last = kids[-1]
1526+
created_snapshots.append(last)
1527+
snapshot_rows[last] = {
1528+
"orders": self._capture_snapshot(t_orders),
1529+
"products": self._capture_snapshot(t_products),
1530+
}
1531+
return last
1532+
1533+
def _apply_sql_mutations(self, *sql_statements: str) -> None:
1534+
with self.session_scope() as session:
1535+
for s in sql_statements:
1536+
session.transaction().execute(s, commit_tx=True)
1537+
time.sleep(1.1)
1538+
1539+
def _import_exported_snapshots_up_to(self, coll_restore: str, export_dir: str, target_ts: str) -> list:
1540+
"""Import exported snapshot directories whose timestamp part <= target_ts into the restore collection."""
1541+
all_dirs = sorted([d for d in os.listdir(export_dir) if os.path.isdir(os.path.join(export_dir, d))])
1542+
chosen = [d for d in all_dirs if d.split("_", 1)[0] <= target_ts]
1543+
assert chosen, f"No exported snapshots with ts <= {target_ts} found in {export_dir}: {all_dirs}"
1544+
1545+
for name in chosen:
1546+
src = os.path.join(export_dir, name)
1547+
dest_path = f"/Root/.backups/collections/{coll_restore}/{name}"
1548+
r = yatest.common.execute(
1549+
[
1550+
backup_bin(),
1551+
"--verbose",
1552+
"--endpoint",
1553+
"grpc://localhost:%d" % self.cluster.nodes[1].grpc_port,
1554+
"--database",
1555+
self.root_dir,
1556+
"tools",
1557+
"restore",
1558+
"--path",
1559+
dest_path,
1560+
"--input",
1561+
src,
1562+
],
1563+
check_exit_code=False,
1564+
)
1565+
out = (r.std_out or b"").decode("utf-8", "ignore")
1566+
err = (r.std_err or b"").decode("utf-8", "ignore")
1567+
assert r.exit_code == 0, f"tools restore import failed for {name}: stdout={out} stderr={err}"
1568+
1569+
# wait for imported snapshots to appear in scheme
1570+
deadline = time.time() + 60
1571+
expected = set(chosen)
1572+
while time.time() < deadline:
1573+
kids = set(self.get_collection_children(coll_restore))
1574+
if expected.issubset(kids):
1575+
break
1576+
time.sleep(1)
1577+
else:
1578+
raise AssertionError(
1579+
f"Imported snapshots did not appear in collection {coll_restore} within timeout. Expected: {sorted(chosen)}"
1580+
)
1581+
1582+
return chosen
1583+
1584+
def test_incremental_chain_restore_when_tables_deleted(self):
1585+
"""Create chain full -> inc1 -> inc2 -> inc3, export/import up to inc2, delete tables and restore."""
1586+
# Setup
1587+
collection_src, t_orders, t_products = self._setup_test_collections()
1588+
full_orders = f"/Root/{t_orders}"
1589+
full_products = f"/Root/{t_products}"
1590+
1591+
# Create incremental-enabled collection
1592+
create_collection_sql = f"""
1593+
CREATE BACKUP COLLECTION `{collection_src}`
1594+
( TABLE `{full_orders}`, TABLE `{full_products}` )
1595+
WITH ( STORAGE = 'cluster', INCREMENTAL_BACKUP_ENABLED = 'true' );
1596+
"""
1597+
create_res = self._execute_yql(create_collection_sql)
1598+
assert create_res.exit_code == 0, f"CREATE BACKUP COLLECTION failed: {getattr(create_res, 'std_err', None)}"
1599+
self.wait_for_collection(collection_src, timeout_s=30)
1600+
1601+
created_snapshots = []
1602+
snapshot_rows = {} # snapshot_name -> {"orders": rows, "products": rows}
1603+
1604+
# Full backup
1605+
r = self._execute_yql(f"BACKUP `{collection_src}`;")
1606+
assert r.exit_code == 0, f"FULL BACKUP 1 failed: {getattr(r, 'std_err', None)}"
1607+
self.wait_for_collection_has_snapshot(collection_src, timeout_s=30)
1608+
self._record_snapshot_and_rows(collection_src, t_orders, t_products, created_snapshots, snapshot_rows)
1609+
1610+
# change data and create incremental 1
1611+
self._apply_sql_mutations(
1612+
'PRAGMA TablePathPrefix("/Root"); UPSERT INTO orders (id, number, txt) VALUES (10, 1000, "inc1");',
1613+
'PRAGMA TablePathPrefix("/Root"); DELETE FROM products WHERE id = 1;'
1614+
)
1615+
r = self._execute_yql(f"BACKUP `{collection_src}` INCREMENTAL;")
1616+
assert r.exit_code == 0, "INCREMENTAL 1 failed"
1617+
self._record_snapshot_and_rows(collection_src, t_orders, t_products, created_snapshots, snapshot_rows)
1618+
1619+
# change data and create incremental 2
1620+
self._apply_sql_mutations(
1621+
'PRAGMA TablePathPrefix("/Root"); UPSERT INTO orders (id, number, txt) VALUES (20, 2000, "inc2");',
1622+
'PRAGMA TablePathPrefix("/Root"); DELETE FROM orders WHERE id = 1;'
1623+
)
1624+
r = self._execute_yql(f"BACKUP `{collection_src}` INCREMENTAL;")
1625+
assert r.exit_code == 0, "INCREMENTAL 2 failed"
1626+
snap_inc2 = self._record_snapshot_and_rows(collection_src, t_orders, t_products, created_snapshots, snapshot_rows)
1627+
1628+
# change data and create incremental 3
1629+
self._apply_sql_mutations(
1630+
'PRAGMA TablePathPrefix("/Root"); UPSERT INTO orders (id, number, txt) VALUES (30, 3000, "inc3");'
1631+
)
1632+
r = self._execute_yql(f"BACKUP `{collection_src}` INCREMENTAL;")
1633+
assert r.exit_code == 0, "INCREMENTAL 3 failed"
1634+
self._record_snapshot_and_rows(collection_src, t_orders, t_products, created_snapshots, snapshot_rows)
1635+
1636+
assert len(created_snapshots) >= 2, "Expected at least 1 full + incrementals"
1637+
1638+
# Export backups
1639+
export_dir, exported_items = self._export_backups(collection_src)
1640+
assert exported_items, "No exported snapshots found"
1641+
exported_dirs = sorted([d for d in os.listdir(export_dir) if os.path.isdir(os.path.join(export_dir, d))])
1642+
for s in created_snapshots:
1643+
assert s in exported_dirs, f"Recorded snapshot {s} not found in exported dirs {exported_dirs}"
1644+
1645+
# Create restore collection and import snapshots up to target (choose inc2)
1646+
target_snap = snap_inc2
1647+
target_ts = target_snap.split("_", 1)[0]
1648+
1649+
coll_restore = f"coll_restore_incr_{int(time.time())}"
1650+
create_restore_sql = f"""
1651+
CREATE BACKUP COLLECTION `{coll_restore}`
1652+
( TABLE `{full_orders}`, TABLE `{full_products}` )
1653+
WITH ( STORAGE = 'cluster' );
1654+
"""
1655+
res = self._execute_yql(create_restore_sql)
1656+
assert res.exit_code == 0, f"CREATE backup collection {coll_restore} failed"
1657+
self.wait_for_collection(coll_restore, timeout_s=30)
1658+
1659+
self._import_exported_snapshots_up_to(coll_restore, export_dir, target_ts)
1660+
time.sleep(1)
1661+
self._drop_tables([t_orders, t_products])
1662+
1663+
# Run RESTORE
1664+
res_restore = self._execute_yql(f"RESTORE `{coll_restore}`;")
1665+
assert res_restore.exit_code == 0, f"RESTORE failed: {getattr(res_restore, 'std_err', None) or getattr(res_restore, 'std_out', None)}"
1666+
1667+
# Verify restored data matches snapshot inc2
1668+
expected_orders = snapshot_rows[target_snap]["orders"]
1669+
expected_products = snapshot_rows[target_snap]["products"]
1670+
1671+
self._verify_restored_table_data(t_orders, expected_orders)
1672+
self._verify_restored_table_data(t_products, expected_products)
1673+
1674+
# Check whether original collection still present or removed (either is acceptable)
1675+
coll_present = self.collection_exists(collection_src)
1676+
if not coll_present:
1677+
logger.info("Starting collection %s not present (deleted) — OK", collection_src)
1678+
else:
1679+
logger.info(
1680+
f"Starting collection {collection_src} is present and incremental backups appear enabled. "
1681+
"Expected: starting collection removed OR incremental backups disabled."
1682+
)
1683+
1684+
if os.path.exists(export_dir):
1685+
shutil.rmtree(export_dir)

0 commit comments

Comments
 (0)