-
Notifications
You must be signed in to change notification settings - Fork 102
/
Copy pathtest_execute_async.py
211 lines (162 loc) · 7.62 KB
/
test_execute_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
from tests.e2e.test_driver import PySQLPytestTestCase
from databricks.sql.ae import (
AsyncExecutionStatus,
AsyncExecutionException,
AsyncExecution,
)
import pytest
import time
import threading
BASE_LONG_QUERY = """
SELECT SUM(A.id - B.id)
FROM range({val}) A CROSS JOIN range({val}) B
GROUP BY (A.id - B.id)
"""
GT_ONE_MINUTE_VALUE = 100000000
# Arrived at this value through some manual testing on a serverless SQL warehouse
# The goal here is to have a query that takes longer than five seconds (therefore bypassing directResults)
# but not so long that I can't attempt to fetch its results in a reasonable amount of time
GT_FIVE_SECONDS_VALUE = 90000
LONG_RUNNING_QUERY = BASE_LONG_QUERY.format(val=GT_ONE_MINUTE_VALUE)
LONG_ISH_QUERY = BASE_LONG_QUERY.format(val=GT_FIVE_SECONDS_VALUE)
# This query should always return in < 5 seconds and therefore should be a direct result
DIRECT_RESULTS_QUERY = "select :param `col`"
class TestExecuteAsync(PySQLPytestTestCase):
@pytest.fixture
def long_running_ae(self, scope="function") -> AsyncExecution:
"""Start a long-running query so we can make assertions about it."""
with self.connection() as conn:
ae = conn.execute_async(LONG_RUNNING_QUERY)
yield ae
# cancellation is idempotent
ae.cancel()
@pytest.fixture
def long_ish_ae(self, scope="function") -> AsyncExecution:
"""Start a long-running query so we can make assertions about it."""
with self.connection() as conn:
ae = conn.execute_async(LONG_ISH_QUERY)
yield ae
def test_execute_async(self):
"""This is a WIP test of the basic API defined in PECO-1263"""
# This is taken directly from the design doc
with self.connection() as conn:
ae = conn.execute_async(DIRECT_RESULTS_QUERY, {"param": 1})
while ae.is_running:
ae.sync_status()
time.sleep(1)
result = ae.get_result().fetchone()
assert result.col == 1
def test_direct_results_query_canary(self):
"""This test verifies that on the current endpoint, the DIRECT_RESULTS_QUERY returned a thrift operation state
other than FINISHED_STATE. If this test fails, it means the SQL warehouse got slower at executing this query
"""
with self.connection() as conn:
ae = conn.execute_async(DIRECT_RESULTS_QUERY, {"param": 1})
assert not ae.is_running
def test_cancel_running_query(self, long_running_ae: AsyncExecution):
long_running_ae.cancel()
assert long_running_ae.status == AsyncExecutionStatus.CANCELED
def test_cant_get_results_after_cancel(self, long_running_ae: AsyncExecution):
long_running_ae.cancel()
with pytest.raises(AsyncExecutionException, match="CANCELED"):
long_running_ae.get_result()
def test_get_async_execution_can_check_status(
self, long_running_ae: AsyncExecution
):
query_id, query_secret = str(long_running_ae.query_id), str(
long_running_ae.query_secret
)
with self.connection() as conn:
ae = conn.get_async_execution(query_id, query_secret)
assert ae.is_running
def test_get_async_execution_can_cancel_across_threads(
self, long_running_ae: AsyncExecution
):
query_id, query_secret = str(long_running_ae.query_id), str(
long_running_ae.query_secret
)
def cancel_query_in_separate_thread(query_id, query_secret):
with self.connection() as conn:
ae = conn.get_async_execution(query_id, query_secret)
ae.cancel()
threading.Thread(
target=cancel_query_in_separate_thread, args=(query_id, query_secret)
).start()
time.sleep(5)
long_running_ae.sync_status()
assert long_running_ae.status == AsyncExecutionStatus.CANCELED
def test_long_ish_query_canary(self, long_ish_ae: AsyncExecution):
"""This test verifies that on the current endpoint, the LONG_ISH_QUERY requires
at least one sync_status call before it is finished. If this test fails, it means
the SQL warehouse got faster at executing this query and we should increment the value
of GT_FIVE_SECONDS_VALUE
It would be easier to do this if Databricks SQL had a SLEEP() function :/
"""
poll_count = 0
while long_ish_ae.is_running:
time.sleep(1)
long_ish_ae.sync_status()
poll_count += 1
assert poll_count > 0
def test_get_async_execution_and_get_results_without_direct_results(
self, long_ish_ae: AsyncExecution
):
while long_ish_ae.is_running:
time.sleep(1)
long_ish_ae.sync_status()
result = long_ish_ae.get_result().fetchone()
assert len(result) == 1
def test_get_async_execution_with_bogus_query_id(self):
with self.connection() as conn:
with pytest.raises(AsyncExecutionException, match="Query not found"):
ae = conn.get_async_execution(
"bedc786d-64da-45d4-99da-5d3603525803",
"ba469f82-cf3f-454e-b575-f4dcd58dd692",
)
def test_get_async_execution_with_badly_formed_query_id(self):
with self.connection() as conn:
with pytest.raises(
ValueError, match="badly formed hexadecimal UUID string"
):
ae = conn.get_async_execution("foo", "bar")
def test_serialize(self, long_running_ae: AsyncExecution):
serialized = long_running_ae.serialize()
query_id, query_secret = serialized.split(":")
with self.connection() as conn:
ae = conn.get_async_execution(query_id, query_secret)
assert ae.is_running
def test_get_async_execution_no_results_when_direct_results_were_sent(self):
"""When DirectResults are sent, they cannot be fetched from a separate thread."""
with self.connection() as conn:
ae = conn.execute_async(DIRECT_RESULTS_QUERY, {"param": 1})
query_id, query_secret = ae.serialize().split(":")
ae.get_result()
with self.connection() as conn:
with pytest.raises(AsyncExecutionException, match="Query not found"):
ae_late = conn.get_async_execution(query_id, query_secret)
def test_get_async_execution_and_fetch_results(self, long_ish_ae: AsyncExecution):
query_id, query_secret = long_ish_ae.serialize().split(":")
with self.connection() as conn:
ae = conn.get_async_execution(query_id, query_secret)
while ae.is_running:
time.sleep(1)
ae.sync_status()
result = ae.get_result().fetchone()
assert len(result) == 1
def test_get_async_execution_twice(self):
"""This test demonstrates that the original AsyncExecution object can fetch a result
and a separate AsyncExecution object can also fetch a result.
"""
with self.connection() as conn_1, self.connection() as conn_2:
ae_1 = conn_1.execute_async(LONG_ISH_QUERY)
query_id, query_secret = ae_1.serialize().split(":")
ae_2 = conn_2.get_async_execution(query_id, query_secret)
while ae_1.is_running:
time.sleep(1)
ae_1.sync_status()
result_1 = ae_1.get_result().fetchone()
assert len(result_1) == 1
ae_2.sync_status()
assert ae_2.status == AsyncExecutionStatus.FINISHED
result_2 = ae_2.get_result().fetchone()
assert len(result_2) == 1