Skip to content

Commit d35e7f4

Browse files
authored
chore: make precommit token check emulator-proof (#1402)
The Emulator returns an empty pre-commit token when a commit is attempted without a pre-commit token. This is different from not returning any pre-commit token at all. The check for 'did the Commit return a pre-commit token?' did not take this into account, which caused commits on the Emulator that needed to be retried, not to be retried. This again caused multiple test errors when running on the Emulator, as this would keep a transaction present on the test database on the Emulator, and the Emulator only supports one transaction at a time. These test failures went unnoticed, because the test configuration for the Emulator had pinned the Emulator version to 1.5.37, which did not support multiplexed sessions. This again caused the tests to fall back to using regular sessions. This change fixes the check for whether a pre-commit token was returned by a Commit. It also unpins the Emulator version for the system tests using default settings. This ensures that the tests actually use multiplexed sessions.
1 parent ffa5c9e commit d35e7f4

File tree

7 files changed

+33
-10
lines changed

7 files changed

+33
-10
lines changed

.github/workflows/integration-tests-against-emulator.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ jobs:
1010

1111
services:
1212
emulator:
13-
image: gcr.io/cloud-spanner-emulator/emulator:1.5.37
13+
image: gcr.io/cloud-spanner-emulator/emulator
1414
ports:
1515
- 9010:9010
1616
- 9020:9020

google/cloud/spanner_v1/snapshot.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,11 @@ def _restart_on_unavailable(
133133
# Update the transaction from the response.
134134
if transaction is not None:
135135
transaction._update_for_result_set_pb(item)
136-
if item.precommit_token is not None and transaction is not None:
136+
if (
137+
item._pb is not None
138+
and item._pb.HasField("precommit_token")
139+
and transaction is not None
140+
):
137141
transaction._update_for_precommit_token_pb(item.precommit_token)
138142

139143
if item.resume_token:
@@ -1029,7 +1033,7 @@ def _update_for_transaction_pb(self, transaction_pb: Transaction) -> None:
10291033
if self._transaction_id is None and transaction_pb.id:
10301034
self._transaction_id = transaction_pb.id
10311035

1032-
if transaction_pb.precommit_token:
1036+
if transaction_pb._pb.HasField("precommit_token"):
10331037
self._update_for_precommit_token_pb_unsafe(transaction_pb.precommit_token)
10341038

10351039
def _update_for_precommit_token_pb(

google/cloud/spanner_v1/transaction.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -328,14 +328,20 @@ def before_next_retry(nth_retry, delay_in_seconds):
328328
# successfully commit, and must be retried with the new precommit token.
329329
# The mutations should not be included in the new request, and no further
330330
# retries or exception handling should be performed.
331-
if commit_response_pb.precommit_token:
331+
if commit_response_pb._pb.HasField("precommit_token"):
332332
add_span_event(span, commit_retry_event_name)
333+
nth_request = database._next_nth_request
333334
commit_response_pb = api.commit(
334335
request=CommitRequest(
335336
precommit_token=commit_response_pb.precommit_token,
336337
**common_commit_request_args,
337338
),
338-
metadata=metadata,
339+
metadata=database.metadata_with_request_id(
340+
nth_request,
341+
1,
342+
metadata,
343+
span,
344+
),
339345
)
340346

341347
add_span_event(span, "Commit Done")
@@ -521,7 +527,7 @@ def wrapped_method(*args, **kwargs):
521527
if is_inline_begin:
522528
self._lock.release()
523529

524-
if result_set_pb.precommit_token is not None:
530+
if result_set_pb._pb.HasField("precommit_token"):
525531
self._update_for_precommit_token_pb(result_set_pb.precommit_token)
526532

527533
return result_set_pb.stats.row_count_exact

tests/system/test_database_api.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,10 @@ def test_db_run_in_transaction_then_snapshot_execute_sql(shared_database):
569569
batch.delete(sd.TABLE, sd.ALL)
570570

571571
def _unit_of_work(transaction, test):
572-
rows = list(transaction.read(test.TABLE, test.COLUMNS, sd.ALL))
572+
# TODO: Remove query and execute a read instead when the Emulator has been fixed
573+
# and returns pre-commit tokens for streaming read results.
574+
rows = list(transaction.execute_sql(sd.SQL))
575+
# rows = list(transaction.read(test.TABLE, test.COLUMNS, sd.ALL))
573576
assert rows == []
574577

575578
transaction.insert_or_update(test.TABLE, test.COLUMNS, test.ROW_DATA)
@@ -882,7 +885,10 @@ def test_db_run_in_transaction_w_max_commit_delay(shared_database):
882885
batch.delete(sd.TABLE, sd.ALL)
883886

884887
def _unit_of_work(transaction, test):
885-
rows = list(transaction.read(test.TABLE, test.COLUMNS, sd.ALL))
888+
# TODO: Remove query and execute a read instead when the Emulator has been fixed
889+
# and returns pre-commit tokens for streaming read results.
890+
rows = list(transaction.execute_sql(sd.SQL))
891+
# rows = list(transaction.read(test.TABLE, test.COLUMNS, sd.ALL))
886892
assert rows == []
887893

888894
transaction.insert_or_update(test.TABLE, test.COLUMNS, test.ROW_DATA)

tests/system/test_session_api.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -932,6 +932,8 @@ def _transaction_read_then_raise(transaction):
932932
def test_transaction_read_and_insert_or_update_then_commit(
933933
sessions_database,
934934
sessions_to_delete,
935+
# TODO: Re-enable when the emulator returns pre-commit tokens for reads.
936+
not_emulator,
935937
):
936938
# [START spanner_test_dml_read_your_writes]
937939
sd = _sample_data
@@ -1586,7 +1588,11 @@ def _read_w_concurrent_update(transaction, pkey):
15861588
transaction.update(COUNTERS_TABLE, COUNTERS_COLUMNS, [[pkey, value + 1]])
15871589

15881590

1589-
def test_transaction_read_w_concurrent_updates(sessions_database):
1591+
def test_transaction_read_w_concurrent_updates(
1592+
sessions_database,
1593+
# TODO: Re-enable when the Emulator returns pre-commit tokens for streaming reads.
1594+
not_emulator,
1595+
):
15901596
pkey = "read_w_concurrent_updates"
15911597
_transaction_concurrency_helper(sessions_database, _read_w_concurrent_update, pkey)
15921598

tests/unit/test_snapshot.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ def _make_item(self, value, resume_token=b"", metadata=None):
158158
resume_token=resume_token,
159159
metadata=metadata,
160160
precommit_token=None,
161+
_pb=None,
161162
spec=["value", "resume_token", "metadata", "precommit_token"],
162163
)
163164

tests/unit/test_transaction.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,7 @@ def _commit_helper(
533533
)
534534
commit.assert_any_call(
535535
request=expected_retry_request,
536-
metadata=base_metadata,
536+
metadata=expected_retry_metadata,
537537
)
538538

539539
if not HAS_OPENTELEMETRY_INSTALLED:

0 commit comments

Comments
 (0)