Skip to content

ya style datashard #18128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 44 additions & 36 deletions ydb/tests/datashard/async_replication/test_async_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,17 @@
from ydb.tests.library.common.wait_for import wait_for
from ydb.tests.datashard.lib.multicluster_test_base import MulticlusterTestBase
from ydb.tests.datashard.lib.dml_operations import DMLOperations
from ydb.tests.datashard.lib.types_of_variables import pk_types, non_pk_types, index_first, index_second, \
index_first_sync, index_second_sync, index_three_sync, index_four_sync, index_zero_sync
from ydb.tests.datashard.lib.types_of_variables import (
pk_types,
non_pk_types,
index_first,
index_second,
index_first_sync,
index_second_sync,
index_three_sync,
index_four_sync,
index_zero_sync,
)


class TestAsyncReplication(MulticlusterTestBase):
Expand All @@ -23,68 +32,67 @@ class TestAsyncReplication(MulticlusterTestBase):
# index_first_sync, "", "UNIQUE", "SYNC"),
# ("table_index_0_UNIQUE_SYNC", pk_types, {},
# index_zero_sync, "", "UNIQUE", "SYNC"),
("table_index_4__SYNC", pk_types, {},
index_four_sync, "", "", "SYNC"),
("table_index_3__SYNC", pk_types, {},
index_three_sync, "", "", "SYNC"),
("table_index_2__SYNC", pk_types, {},
index_second_sync, "", "", "SYNC"),
("table_index_1__SYNC", pk_types, {},
index_first_sync, "", "", "SYNC"),
("table_index_0__SYNC", pk_types, {},
index_zero_sync, "", "", "SYNC"),
("table_index_4__SYNC", pk_types, {}, index_four_sync, "", "", "SYNC"),
("table_index_3__SYNC", pk_types, {}, index_three_sync, "", "", "SYNC"),
("table_index_2__SYNC", pk_types, {}, index_second_sync, "", "", "SYNC"),
("table_index_1__SYNC", pk_types, {}, index_first_sync, "", "", "SYNC"),
("table_index_0__SYNC", pk_types, {}, index_zero_sync, "", "", "SYNC"),
("table_index_1__ASYNC", pk_types, {}, index_second, "", "", "ASYNC"),
("table_index_0__ASYNC", pk_types, {}, index_first, "", "", "ASYNC"),
("table_all_types", pk_types, {
**pk_types, **non_pk_types}, {}, "", "", ""),
("table_all_types", pk_types, {**pk_types, **non_pk_types}, {}, "", "", ""),
("table_ttl_DyNumber", pk_types, {}, {}, "DyNumber", "", ""),
("table_ttl_Uint32", pk_types, {}, {}, "Uint32", "", ""),
("table_ttl_Uint64", pk_types, {}, {}, "Uint64", "", ""),
("table_ttl_Datetime", pk_types, {}, {}, "Datetime", "", ""),
("table_ttl_Timestamp", pk_types, {}, {}, "Timestamp", "", ""),
("table_ttl_Date", pk_types, {}, {}, "Date", "", ""),
]
],
)
def test_async_replication(self, table_name: str, pk_types: dict[str, str], all_types: dict[str, str], index: dict[str, str], ttl: str, unique: str, sync: str):
dml_cluster_1 = DMLOperations(Query.create(
self.get_database(), self.get_endpoint(self.clusters[0])))
dml_cluster_2 = DMLOperations(Query.create(
self.get_database(), self.get_endpoint(self.clusters[1])))
def test_async_replication(
self,
table_name: str,
pk_types: dict[str, str],
all_types: dict[str, str],
index: dict[str, str],
ttl: str,
unique: str,
sync: str,
):
dml_cluster_1 = DMLOperations(Query.create(self.get_database(), self.get_endpoint(self.clusters[0])))
dml_cluster_2 = DMLOperations(Query.create(self.get_database(), self.get_endpoint(self.clusters[1])))

dml_cluster_1.create_table(table_name, pk_types, all_types,
index, ttl, unique, sync)
dml_cluster_1.create_table(table_name, pk_types, all_types, index, ttl, unique, sync)
dml_cluster_1.insert(table_name, all_types, pk_types, index, ttl)
dml_cluster_2.query(f"""
dml_cluster_2.query(
f"""
CREATE ASYNC REPLICATION `replication_{table_name}`
FOR `{self.get_database()}/{table_name}` AS `{self.get_database()}/{table_name}`
WITH (
CONNECTION_STRING = 'grpc://{self.get_endpoint(self.clusters[0])}/?database={self.get_database()}'
)
""")
"""
)
for _ in range(100):
try:
dml_cluster_2.query(
f"select count(*) as count from {table_name}")
dml_cluster_2.query(f"select count(*) as count from {table_name}")
break
except Exception:
time.sleep(1)
dml_cluster_2.select_after_insert(
table_name, all_types, pk_types, index, ttl)
dml_cluster_2.select_after_insert(table_name, all_types, pk_types, index, ttl)
dml_cluster_1.query(f"delete from {table_name}")
assert wait_for(self.create_predicate(True, table_name, dml_cluster_2.query),
timeout_seconds=100) is True, "Expected zero rows after delete"
assert (
wait_for(self.create_predicate(True, table_name, dml_cluster_2.query), timeout_seconds=100) is True
), "Expected zero rows after delete"
dml_cluster_1.insert(table_name, all_types, pk_types, index, ttl)
wait_for(self.create_predicate(False, table_name,
dml_cluster_2.query), timeout_seconds=100)
dml_cluster_2.select_after_insert(
table_name, all_types, pk_types, index, ttl)
wait_for(self.create_predicate(False, table_name, dml_cluster_2.query), timeout_seconds=100)
dml_cluster_2.select_after_insert(table_name, all_types, pk_types, index, ttl)

def create_predicate(self, is_zero, table_name, dml_cluster):
def predicate():
rows = dml_cluster(
f"select count(*) as count from {table_name}")
rows = dml_cluster(f"select count(*) as count from {table_name}")
if is_zero:
return len(rows) == 1 and rows[0].count == 0
else:
return len(rows) == 1 and rows[0].count != 0

return predicate
85 changes: 48 additions & 37 deletions ydb/tests/datashard/copy_table/test_copy_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,57 +4,68 @@

from ydb.tests.sql.lib.test_base import TestBase
from ydb.tests.datashard.lib.dml_operations import DMLOperations
from ydb.tests.datashard.lib.types_of_variables import pk_types, non_pk_types, index_first, index_second, \
index_first_sync, index_second_sync, index_three_sync, index_three_sync_not_Bool, index_four_sync, index_zero_sync
from ydb.tests.datashard.lib.types_of_variables import (
pk_types,
non_pk_types,
index_first,
index_second,
index_first_sync,
index_second_sync,
index_three_sync,
index_three_sync_not_Bool,
index_four_sync,
index_zero_sync,
)


class TestCopyTable(TestBase):
@pytest.mark.parametrize(
"table_name, pk_types, all_types, index, ttl, unique, sync",
[
("table_index_4_UNIQUE_SYNC", pk_types, {},
index_four_sync, "", "UNIQUE", "SYNC"),
("table_index_3_UNIQUE_SYNC", pk_types, {},
index_three_sync_not_Bool, "", "UNIQUE", "SYNC"),
("table_index_2_UNIQUE_SYNC", pk_types, {},
index_second_sync, "", "UNIQUE", "SYNC"),
("table_index_1_UNIQUE_SYNC", pk_types, {},
index_first_sync, "", "UNIQUE", "SYNC"),
("table_index_0_UNIQUE_SYNC", pk_types, {},
index_zero_sync, "", "UNIQUE", "SYNC"),
("table_index_4__SYNC", pk_types, {},
index_four_sync, "", "", "SYNC"),
("table_index_3__SYNC", pk_types, {},
index_three_sync, "", "", "SYNC"),
("table_index_2__SYNC", pk_types, {},
index_second_sync, "", "", "SYNC"),
("table_index_1__SYNC", pk_types, {},
index_first_sync, "", "", "SYNC"),
("table_index_0__SYNC", pk_types, {},
index_zero_sync, "", "", "SYNC"),
("table_index_4_UNIQUE_SYNC", pk_types, {}, index_four_sync, "", "UNIQUE", "SYNC"),
("table_index_3_UNIQUE_SYNC", pk_types, {}, index_three_sync_not_Bool, "", "UNIQUE", "SYNC"),
("table_index_2_UNIQUE_SYNC", pk_types, {}, index_second_sync, "", "UNIQUE", "SYNC"),
("table_index_1_UNIQUE_SYNC", pk_types, {}, index_first_sync, "", "UNIQUE", "SYNC"),
("table_index_0_UNIQUE_SYNC", pk_types, {}, index_zero_sync, "", "UNIQUE", "SYNC"),
("table_index_4__SYNC", pk_types, {}, index_four_sync, "", "", "SYNC"),
("table_index_3__SYNC", pk_types, {}, index_three_sync, "", "", "SYNC"),
("table_index_2__SYNC", pk_types, {}, index_second_sync, "", "", "SYNC"),
("table_index_1__SYNC", pk_types, {}, index_first_sync, "", "", "SYNC"),
("table_index_0__SYNC", pk_types, {}, index_zero_sync, "", "", "SYNC"),
("table_index_1__ASYNC", pk_types, {}, index_second, "", "", "ASYNC"),
("table_index_0__ASYNC", pk_types, {}, index_first, "", "", "ASYNC"),
("table_all_types", pk_types, {
**pk_types, **non_pk_types}, {}, "", "", ""),
("table_all_types", pk_types, {**pk_types, **non_pk_types}, {}, "", "", ""),
("table_ttl_DyNumber", pk_types, {}, {}, "DyNumber", "", ""),
("table_ttl_Uint32", pk_types, {}, {}, "Uint32", "", ""),
("table_ttl_Uint64", pk_types, {}, {}, "Uint64", "", ""),
("table_ttl_Datetime", pk_types, {}, {}, "Datetime", "", ""),
("table_ttl_Timestamp", pk_types, {}, {}, "Timestamp", "", ""),
("table_ttl_Date", pk_types, {}, {}, "Date", "", ""),
]
],
)
def test_copy_table(self, table_name: str, pk_types: dict[str, str], all_types: dict[str, str], index: dict[str, str], ttl: str, unique: str, sync: str):
def test_copy_table(
self,
table_name: str,
pk_types: dict[str, str],
all_types: dict[str, str],
index: dict[str, str],
ttl: str,
unique: str,
sync: str,
):
dml = DMLOperations(self)
dml.create_table(table_name, pk_types, all_types,
index, ttl, unique, sync)
dml.create_table(table_name, pk_types, all_types, index, ttl, unique, sync)
dml.insert(table_name, all_types, pk_types, index, ttl)
yatest.common.execute([
yatest.common.binary_path(os.getenv('YDB_CLI_BINARY')),
'-e', 'grpc://'+self.get_endpoint(),
f"--database={self.get_database()}",
'tools', 'copy',
'--item', f"destination=copy_{table_name},source={table_name}"
]).stdout.decode("utf-8")
dml.select_after_insert(
f"copy_{table_name}", all_types, pk_types, index, ttl)
yatest.common.execute(
[
yatest.common.binary_path(os.getenv('YDB_CLI_BINARY')),
'-e',
'grpc://' + self.get_endpoint(),
f"--database={self.get_database()}",
'tools',
'copy',
'--item',
f"destination=copy_{table_name},source={table_name}",
]
).stdout.decode("utf-8")
dml.select_after_insert(f"copy_{table_name}", all_types, pk_types, index, ttl)
63 changes: 35 additions & 28 deletions ydb/tests/datashard/dml/test_dml.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,57 @@

from ydb.tests.sql.lib.test_base import TestBase
from ydb.tests.datashard.lib.dml_operations import DMLOperations
from ydb.tests.datashard.lib.types_of_variables import pk_types, non_pk_types, index_first, index_second, \
index_first_sync, index_second_sync, index_three_sync, index_three_sync_not_Bool, index_four_sync, index_zero_sync
from ydb.tests.datashard.lib.types_of_variables import (
pk_types,
non_pk_types,
index_first,
index_second,
index_first_sync,
index_second_sync,
index_three_sync,
index_three_sync_not_Bool,
index_four_sync,
index_zero_sync,
)


class TestDML(TestBase):
@pytest.mark.parametrize(
"table_name, pk_types, all_types, index, ttl, unique, sync",
[
("table_index_4_UNIQUE_SYNC", pk_types, {},
index_four_sync, "", "UNIQUE", "SYNC"),
("table_index_3_UNIQUE_SYNC", pk_types, {},
index_three_sync_not_Bool, "", "UNIQUE", "SYNC"),
("table_index_2_UNIQUE_SYNC", pk_types, {},
index_second_sync, "", "UNIQUE", "SYNC"),
("table_index_1_UNIQUE_SYNC", pk_types, {},
index_first_sync, "", "UNIQUE", "SYNC"),
("table_index_0_UNIQUE_SYNC", pk_types, {},
index_zero_sync, "", "UNIQUE", "SYNC"),
("table_index_4__SYNC", pk_types, {},
index_four_sync, "", "", "SYNC"),
("table_index_3__SYNC", pk_types, {},
index_three_sync, "", "", "SYNC"),
("table_index_2__SYNC", pk_types, {},
index_second_sync, "", "", "SYNC"),
("table_index_1__SYNC", pk_types, {},
index_first_sync, "", "", "SYNC"),
("table_index_0__SYNC", pk_types, {},
index_zero_sync, "", "", "SYNC"),
("table_index_4_UNIQUE_SYNC", pk_types, {}, index_four_sync, "", "UNIQUE", "SYNC"),
("table_index_3_UNIQUE_SYNC", pk_types, {}, index_three_sync_not_Bool, "", "UNIQUE", "SYNC"),
("table_index_2_UNIQUE_SYNC", pk_types, {}, index_second_sync, "", "UNIQUE", "SYNC"),
("table_index_1_UNIQUE_SYNC", pk_types, {}, index_first_sync, "", "UNIQUE", "SYNC"),
("table_index_0_UNIQUE_SYNC", pk_types, {}, index_zero_sync, "", "UNIQUE", "SYNC"),
("table_index_4__SYNC", pk_types, {}, index_four_sync, "", "", "SYNC"),
("table_index_3__SYNC", pk_types, {}, index_three_sync, "", "", "SYNC"),
("table_index_2__SYNC", pk_types, {}, index_second_sync, "", "", "SYNC"),
("table_index_1__SYNC", pk_types, {}, index_first_sync, "", "", "SYNC"),
("table_index_0__SYNC", pk_types, {}, index_zero_sync, "", "", "SYNC"),
("table_index_1__ASYNC", pk_types, {}, index_second, "", "", "ASYNC"),
("table_index_0__ASYNC", pk_types, {}, index_first, "", "", "ASYNC"),
("table_all_types", pk_types, {
**pk_types, **non_pk_types}, {}, "", "", ""),
("table_all_types", pk_types, {**pk_types, **non_pk_types}, {}, "", "", ""),
("table_ttl_DyNumber", pk_types, {}, {}, "DyNumber", "", ""),
("table_ttl_Uint32", pk_types, {}, {}, "Uint32", "", ""),
("table_ttl_Uint64", pk_types, {}, {}, "Uint64", "", ""),
("table_ttl_Datetime", pk_types, {}, {}, "Datetime", "", ""),
("table_ttl_Timestamp", pk_types, {}, {}, "Timestamp", "", ""),
("table_ttl_Date", pk_types, {}, {}, "Date", "", ""),
]
],
)
def test_dml(self, table_name: str, pk_types: dict[str, str], all_types: dict[str, str], index: dict[str, str], ttl: str, unique: str, sync: str):
def test_dml(
self,
table_name: str,
pk_types: dict[str, str],
all_types: dict[str, str],
index: dict[str, str],
ttl: str,
unique: str,
sync: str,
):
dml = DMLOperations(self)
dml.create_table(table_name, pk_types, all_types,
index, ttl, unique, sync)
dml.create_table(table_name, pk_types, all_types, index, ttl, unique, sync)
dml.insert(table_name, all_types, pk_types, index, ttl)
dml.select_all_type(table_name, all_types, pk_types, index, ttl)
dml.select_after_insert(table_name, all_types, pk_types, index, ttl)
Expand Down
Loading