Skip to content

Rework find_and_delete_entries (bugfix and better testing) #172

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Environments
.env
.testing.env
.local.testing.env
env

### (abridged) BOILERPLATE STARTS HERE
Expand Down
5 changes: 5 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ v 0.1.10
Add method to get session and keyspace from parameters.
Calling `cassio.init()` with insufficient arguments now raises an exception.
Fixed: bug when metadata key contains json and, in general, curly braces (by @epinzur)
Bugfix: find_and_delete_entries (metadata mixin) now uses the provided batch_size
Bugfix: find_and_delete_entries (metadata mixin) made compatible with clustered mixin
Added testing for find_and_delete_entries:
- with clustered mixin and all call patterns (w/out partition, w/out row_id)
- enhanced testing on simple metadata table (with counting checks, all call patterns)

v 0.1.9
=======
Expand Down
42 changes: 34 additions & 8 deletions src/cassio/table/mixins/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,22 +279,48 @@ def _get_find_entries_cql(
)
return select_cql, select_vals

def find_entries(self, n: int, **kwargs: Any) -> Iterable[RowType]:
def _find_unnormalized_entries(self, n: int, **kwargs: Any) -> Iterable[RowType]:
select_cql, select_vals = self._get_find_entries_cql(n, **kwargs)
result_set = self.execute_cql(
select_cql, args=select_vals, op_type=CQLOpType.READ
)
return (self._normalize_row(result) for result in result_set)
return (
raw_row if isinstance(raw_row, dict) else raw_row._asdict() # type: ignore[attr-defined]
for raw_row in result_set
)

def find_entries(self, n: int, **kwargs: Any) -> Iterable[RowType]:
return (
self._normalize_row(result)
for result in self._find_unnormalized_entries(
n=n,
**kwargs,
)
)

def find_entries_async(self, n: int, **kwargs: Any) -> ResponseFuture:
raise NotImplementedError("Asynchronous reads are not supported.")

async def afind_entries(self, n: int, **kwargs: Any) -> Iterable[RowType]:
async def _afind_unnormalized_entries(
self, n: int, **kwargs: Any
) -> Iterable[RowType]:
select_cql, select_vals = self._get_find_entries_cql(n, **kwargs)
result_set = await self.aexecute_cql(
select_cql, args=select_vals, op_type=CQLOpType.READ
)
return (self._normalize_row(result) for result in result_set)
return (
raw_row if isinstance(raw_row, dict) else raw_row._asdict() # type: ignore[attr-defined]
for raw_row in result_set
)

async def afind_entries(self, n: int, **kwargs: Any) -> Iterable[RowType]:
return (
self._normalize_row(result)
for result in await self._afind_unnormalized_entries(
n=n,
**kwargs,
)
)

@staticmethod
def _get_to_delete_and_visited(
Expand Down Expand Up @@ -322,15 +348,13 @@ def find_and_delete_entries(
# TODO: Use the 'columns' for a narrowed projection
# TODO: decouple finding and deleting (streaming) for faster performance
primary_key_cols = [col for col, _ in self._schema_primary_key()]
#
batch_size = 20
to_delete, visited_tuples = self._get_to_delete_and_visited(
n, batch_size, set()
)
while to_delete > 0:
del_pkargs = [
[found_row[pkc] for pkc in primary_key_cols]
for found_row in self.find_entries(n=to_delete, **kwargs)
for found_row in self._find_unnormalized_entries(n=to_delete, **kwargs)
]
if del_pkargs == []:
break
Expand Down Expand Up @@ -366,7 +390,9 @@ async def afind_and_delete_entries(
while to_delete > 0:
del_pkargs = [
[found_row[pkc] for pkc in primary_key_cols]
for found_row in await self.afind_entries(n=to_delete, **kwargs)
for found_row in await self._afind_unnormalized_entries(
n=to_delete, **kwargs
)
]
delete_coros = [
self.adelete(
Expand Down
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pytest
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster, Session
from cassandra.protocol import ProtocolVersion
from dotenv import load_dotenv
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs
Expand Down Expand Up @@ -82,6 +83,7 @@ def db_session(cassandra_port: int) -> Iterator[Session]:
ASTRA_DB_CLIENT_ID,
ASTRA_DB_APPLICATION_TOKEN,
),
protocol_version=ProtocolVersion.V4,
)
yield cluster.connect()
elif mode in ["LOCAL_CASSANDRA", "TESTCONTAINERS_CASSANDRA"]:
Expand Down
265 changes: 265 additions & 0 deletions tests/integration/test_tableclasses_clusteredmetadatacassandratable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
"""
Table classes integration test - ClusteredMetadataCassandraTable
"""

import asyncio

import pytest
from cassandra.cluster import Session

from cassio.table.tables import ClusteredMetadataCassandraTable


@pytest.mark.usefixtures("db_session", "db_keyspace")
class TestClusteredMetadataCassandraTable:
def test_find_and_delete_entries_sync(
self, db_session: Session, db_keyspace: str
) -> None:
"""
Plan for the rows in this table:

primary key partition key metadata
-----------------------------------------------------------
("C", "up") ("dele", 0) {"field": "alpha"}
("C", "up") ("dele", 1) {"field": "alpha"}
...
("C", "up") ("good", 0) {"field": "omega"}
("C", "up") ("good", 1) {"field": "omega"}
...
("C", "dn") ("dele", 0) {"field": "alpha"}
("C", "dn") ("dele", 1) {"field": "alpha"}
...
("C", "dn") ("good", 0) {"field": "omega"}
("C", "dn") ("good", 1) {"field": "omega"}
...

for a total of 2 x 2 x ONEFOURTH_N_ROWS:
a 2 due to up/dn in part key
a 2 due to dele/good in clustering
The total rows to delete, i.e. those with alpha, are 2 x ONEFOURTH_N_ROWS.
"""
table_name_fad = "cm_ct"
ONEFOURTH_N_ROWS = 128
FAD_MAX_COUNT = 30 # must be < ONEFOURTH_N_ROWS for full testing
FAD_BATCH_SIZE = 25 # must be < FAD_MAX_COUNT-1 for full testing
db_session.execute(f"DROP TABLE IF EXISTS {db_keyspace}.{table_name_fad};")
t_fad = ClusteredMetadataCassandraTable(
session=db_session,
keyspace=db_keyspace,
table=table_name_fad,
primary_key_type=["TEXT", "TEXT", "TEXT", "INT"],
num_partition_keys=2,
)
futures = [
t_fad.put_async(
partition_id=("C", part_k),
row_id=(["good", "dele"][dele_status], row_i),
body_blob=(
f"PART_{part_k} / ROWID_{['good', 'dele'][dele_status]} "
f"/ md_{['omega', 'alpha'][dele_status]}"
),
metadata={"field": ["omega", "alpha"][dele_status]},
)
for row_i in range(ONEFOURTH_N_ROWS)
for dele_status in [1, 0] # 1 means "alpha", i.e. delete-me
for part_k in ["up", "dn"]
]
for f in futures:
_ = f.result()
#
q_md = {"field": "alpha"}

total_matching_rows = 2 * ONEFOURTH_N_ROWS

num_found_items_0a = len(
list(t_fad.find_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_0a == total_matching_rows

# find_and_delete calls without match:
num_deleted0a = t_fad.find_and_delete_entries(
metadata=q_md,
partition_id=("X", "up"),
)
assert num_deleted0a == 0
num_deleted0b = t_fad.find_and_delete_entries(
metadata=q_md,
partition_id=("C", "up"),
row_id=("no", -1),
)
assert num_deleted0b == 0
num_deleted0c = t_fad.find_and_delete_entries(
metadata=q_md,
partition_id=("C", "up"),
row_id=("good", 0),
)
assert num_deleted0c == 0

num_found_items_0b = len(
list(t_fad.find_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_0b == total_matching_rows

# one-item deletion
num_deleted1 = t_fad.find_and_delete_entries(
metadata=q_md,
partition_id=("C", "up"),
row_id=("dele", 0),
)
assert num_deleted1 == 1
num_found_items_1 = len(
list(t_fad.find_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_1 == total_matching_rows - 1

# deletion of part of a partition
num_deleted_p = t_fad.find_and_delete_entries(
metadata=q_md,
partition_id=("C", "up"),
n=FAD_MAX_COUNT,
batch_size=FAD_BATCH_SIZE,
)
assert num_deleted_p == FAD_MAX_COUNT
num_found_items_p = len(
list(t_fad.find_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_p == total_matching_rows - FAD_MAX_COUNT - 1

# deletion of the rest of the partition
num_deleted_p2 = t_fad.find_and_delete_entries(
metadata=q_md,
partition_id=("C", "up"),
batch_size=FAD_BATCH_SIZE,
)
assert num_deleted_p2 == ONEFOURTH_N_ROWS - FAD_MAX_COUNT - 1
num_found_items_p2 = len(
list(t_fad.find_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_p2 == total_matching_rows - ONEFOURTH_N_ROWS

# deletion of everything that remains
num_deleted_a = t_fad.find_and_delete_entries(
metadata=q_md,
batch_size=FAD_BATCH_SIZE,
)
assert num_deleted_a == ONEFOURTH_N_ROWS
num_found_items_a = len(
list(t_fad.find_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_a == 0

@pytest.mark.asyncio
async def test_find_and_delete_entries_async(
self, db_session: Session, db_keyspace: str
) -> None:
"""Same logic as for the sync counterpart."""
table_name_fad = "cm_ct"
ONEFOURTH_N_ROWS = 128
FAD_MAX_COUNT = 30 # must be < ONEFOURTH_N_ROWS for full testing
FAD_BATCH_SIZE = 25 # must be < FAD_MAX_COUNT-1 for full testing
db_session.execute(f"DROP TABLE IF EXISTS {db_keyspace}.{table_name_fad};")
t_fad = ClusteredMetadataCassandraTable(
session=db_session,
keyspace=db_keyspace,
table=table_name_fad,
primary_key_type=["TEXT", "TEXT", "TEXT", "INT"],
num_partition_keys=2,
)

coros = [
t_fad.aput(
partition_id=("C", part_k),
row_id=(["good", "dele"][dele_status], row_i),
body_blob=(
f"PART_{part_k} / ROWID_{['good', 'dele'][dele_status]} "
f"/ md_{['omega', 'alpha'][dele_status]}"
),
metadata={"field": ["omega", "alpha"][dele_status]},
)
for row_i in range(ONEFOURTH_N_ROWS)
for dele_status in [1, 0] # 1 means "alpha", i.e. delete-me
for part_k in ["up", "dn"]
]
await asyncio.gather(*coros)

#
q_md = {"field": "alpha"}

total_matching_rows = 2 * ONEFOURTH_N_ROWS

num_found_items_0a = len(
list(await t_fad.afind_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_0a == total_matching_rows

# find_and_delete calls without match:
num_deleted0a = await t_fad.afind_and_delete_entries(
metadata=q_md,
partition_id=("X", "up"),
)
assert num_deleted0a == 0
num_deleted0b = await t_fad.afind_and_delete_entries(
metadata=q_md,
partition_id=("C", "up"),
row_id=("no", -1),
)
assert num_deleted0b == 0
num_deleted0c = await t_fad.afind_and_delete_entries(
metadata=q_md,
partition_id=("C", "up"),
row_id=("good", 0),
)
assert num_deleted0c == 0

num_found_items_0b = len(
list(await t_fad.afind_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_0b == total_matching_rows

# one-item deletion
num_deleted1 = await t_fad.afind_and_delete_entries(
metadata=q_md,
partition_id=("C", "up"),
row_id=("dele", 0),
)
assert num_deleted1 == 1
num_found_items_1 = len(
list(await t_fad.afind_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_1 == total_matching_rows - 1

# deletion of part of a partition
num_deleted_p = await t_fad.afind_and_delete_entries(
metadata=q_md,
partition_id=("C", "up"),
n=FAD_MAX_COUNT,
batch_size=FAD_BATCH_SIZE,
)
assert num_deleted_p == FAD_MAX_COUNT
num_found_items_p = len(
list(await t_fad.afind_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_p == total_matching_rows - FAD_MAX_COUNT - 1

# deletion of the rest of the partition
num_deleted_p2 = await t_fad.afind_and_delete_entries(
metadata=q_md,
partition_id=("C", "up"),
batch_size=FAD_BATCH_SIZE,
)
assert num_deleted_p2 == ONEFOURTH_N_ROWS - FAD_MAX_COUNT - 1
num_found_items_p2 = len(
list(await t_fad.afind_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_p2 == total_matching_rows - ONEFOURTH_N_ROWS

# deletion of everything that remains
num_deleted_a = await t_fad.afind_and_delete_entries(
metadata=q_md,
batch_size=FAD_BATCH_SIZE,
)
assert num_deleted_a == ONEFOURTH_N_ROWS
num_found_items_a = len(
list(await t_fad.afind_entries(n=total_matching_rows + 1, metadata=q_md))
)
assert num_found_items_a == 0
Loading