Skip to content

Commit c081542

Browse files
authored
[data] Split out long running scaling test (#54045)
## Why are these changes needed? Test `test_arrow_block` has become flakey, often failing because it times out on `test_arrow_batch_gt_2gb` which is a scaling test to see if the arrow code works with a single 2gb batch. This splits out that test into its own suite to see if that will reduce the likelihood of a timeout (limit should be 180s). If this does not work, the next step will be to try running this on a larger worker. ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Matthew Owen <[email protected]>
1 parent 980c81d commit c081542

File tree

3 files changed

+123
-94
lines changed

3 files changed

+123
-94
lines changed

python/ray/data/BUILD

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,21 @@ py_test(
184184
],
185185
)
186186

187+
py_test(
188+
name = "test_arrow_block_scaling",
189+
size = "large",
190+
srcs = ["tests/test_arrow_block_scaling.py"],
191+
tags = [
192+
"data_non_parallel",
193+
"exclusive",
194+
"team:data",
195+
],
196+
deps = [
197+
":conftest",
198+
"//:ray_lib",
199+
],
200+
)
201+
187202
py_test(
188203
name = "test_auto_parallelism",
189204
size = "medium",

python/ray/data/tests/test_arrow_block.py

Lines changed: 0 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import gc
21
import os
32
import sys
43
import types
@@ -9,7 +8,6 @@
98
import pandas as pd
109
import pyarrow as pa
1110
import pytest
12-
from pyarrow import parquet as pq
1311

1412
import ray
1513
from ray._private.test_utils import run_string_as_driver
@@ -147,39 +145,6 @@ def test_to_pylist(self, arr, as_py):
147145
assert accessor.to_pylist() == arr.to_pylist()
148146

149147

150-
@pytest.fixture(scope="module")
151-
def parquet_dataset_single_column_gt_2gb():
152-
chunk_size = 256 * MiB
153-
num_chunks = 10
154-
155-
total_column_size = chunk_size * 10 # ~2.5 GiB
156-
157-
with TemporaryDirectory() as tmp_dir:
158-
dataset_path = f"{tmp_dir}/large_parquet_chunk_{chunk_size}"
159-
160-
# Create directory
161-
os.mkdir(dataset_path)
162-
163-
for i in range(num_chunks):
164-
chunk = b"a" * chunk_size
165-
166-
d = {"id": [i], "bin": [chunk]}
167-
t = pa.Table.from_pydict(d)
168-
169-
print(f">>> Table schema: {t.schema} (size={sys.getsizeof(t)})")
170-
171-
filepath = f"{dataset_path}/chunk_{i}.parquet"
172-
pq.write_table(t, filepath)
173-
174-
print(f">>> Created a chunk #{i}")
175-
176-
print(f">>> Created dataset at {dataset_path}")
177-
178-
yield dataset_path, num_chunks, total_column_size
179-
180-
print(f">>> Cleaning up dataset at {dataset_path}")
181-
182-
183148
@pytest.fixture(scope="module")
184149
def binary_dataset_single_file_gt_2gb():
185150
total_size = int(2.1 * GiB)
@@ -243,65 +208,6 @@ def _id(row):
243208
assert total == 1
244209

245210

246-
@pytest.mark.parametrize(
247-
"op",
248-
[
249-
"map",
250-
"map_batches",
251-
],
252-
)
253-
def test_arrow_batch_gt_2gb(
254-
ray_start_regular,
255-
parquet_dataset_single_column_gt_2gb,
256-
restore_data_context,
257-
op,
258-
):
259-
# Disable (automatic) fallback to `ArrowPythonObjectType` extension type
260-
DataContext.get_current().enable_fallback_to_arrow_object_ext_type = False
261-
262-
dataset_path, num_rows, total_column_size = parquet_dataset_single_column_gt_2gb
263-
264-
def _id(x):
265-
return x
266-
267-
ds = ray.data.read_parquet(dataset_path)
268-
269-
if op == "map":
270-
ds = ds.map(_id)
271-
elif op == "map_batches":
272-
# Combine all rows into a single batch using `map_batches` coercing to
273-
# numpy format
274-
ds = ds.map_batches(
275-
_id,
276-
batch_format="numpy",
277-
batch_size=num_rows,
278-
zero_copy_batch=False,
279-
)
280-
281-
batch = ds.take_batch()
282-
283-
total_binary_column_size = sum([len(b) for b in batch["bin"]])
284-
285-
print(
286-
f">>> Batch:\n"
287-
f"------\n"
288-
"Column: 'id'\n"
289-
f"Values: {batch['id']}\n"
290-
f"------\n"
291-
"Column: 'bin'\n"
292-
f"Total: {total_binary_column_size / GiB} GiB\n"
293-
f"Values: {[str(v)[:3] + ' x ' + str(len(v)) for v in batch['bin']]}\n"
294-
)
295-
296-
assert total_binary_column_size == total_column_size
297-
298-
# Clean up refs
299-
del batch
300-
del ds
301-
# Force GC to free up object store memory
302-
gc.collect()
303-
304-
305211
@pytest.mark.parametrize(
306212
"input_,expected_output",
307213
[
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import gc
2+
import os
3+
import sys
4+
from tempfile import TemporaryDirectory
5+
6+
import pyarrow as pa
7+
import pytest
8+
from pyarrow import parquet as pq
9+
10+
import ray
11+
from ray.data import DataContext
12+
from ray.data._internal.util import GiB, MiB
13+
14+
15+
@pytest.fixture(scope="module")
16+
def parquet_dataset_single_column_gt_2gb():
17+
chunk_size = 256 * MiB
18+
num_chunks = 10
19+
20+
total_column_size = chunk_size * 10 # ~2.5 GiB
21+
22+
with TemporaryDirectory() as tmp_dir:
23+
dataset_path = f"{tmp_dir}/large_parquet_chunk_{chunk_size}"
24+
25+
# Create directory
26+
os.mkdir(dataset_path)
27+
28+
for i in range(num_chunks):
29+
chunk = b"a" * chunk_size
30+
31+
d = {"id": [i], "bin": [chunk]}
32+
t = pa.Table.from_pydict(d)
33+
34+
print(f">>> Table schema: {t.schema} (size={sys.getsizeof(t)})")
35+
36+
filepath = f"{dataset_path}/chunk_{i}.parquet"
37+
pq.write_table(t, filepath)
38+
39+
print(f">>> Created a chunk #{i}")
40+
41+
print(f">>> Created dataset at {dataset_path}")
42+
43+
yield dataset_path, num_chunks, total_column_size
44+
45+
print(f">>> Cleaning up dataset at {dataset_path}")
46+
47+
48+
@pytest.mark.parametrize(
49+
"op",
50+
[
51+
"map",
52+
"map_batches",
53+
],
54+
)
55+
def test_arrow_batch_gt_2gb(
56+
ray_start_regular,
57+
parquet_dataset_single_column_gt_2gb,
58+
restore_data_context,
59+
op,
60+
):
61+
# Disable (automatic) fallback to `ArrowPythonObjectType` extension type
62+
DataContext.get_current().enable_fallback_to_arrow_object_ext_type = False
63+
64+
dataset_path, num_rows, total_column_size = parquet_dataset_single_column_gt_2gb
65+
66+
def _id(x):
67+
return x
68+
69+
ds = ray.data.read_parquet(dataset_path)
70+
71+
if op == "map":
72+
ds = ds.map(_id)
73+
elif op == "map_batches":
74+
# Combine all rows into a single batch using `map_batches` coercing to
75+
# numpy format
76+
ds = ds.map_batches(
77+
_id,
78+
batch_format="numpy",
79+
batch_size=num_rows,
80+
zero_copy_batch=False,
81+
)
82+
83+
batch = ds.take_batch()
84+
85+
total_binary_column_size = sum([len(b) for b in batch["bin"]])
86+
87+
print(
88+
f">>> Batch:\n"
89+
f"------\n"
90+
"Column: 'id'\n"
91+
f"Values: {batch['id']}\n"
92+
f"------\n"
93+
"Column: 'bin'\n"
94+
f"Total: {total_binary_column_size / GiB} GiB\n"
95+
f"Values: {[str(v)[:3] + ' x ' + str(len(v)) for v in batch['bin']]}\n"
96+
)
97+
98+
assert total_binary_column_size == total_column_size
99+
100+
# Clean up refs
101+
del batch
102+
del ds
103+
# Force GC to free up object store memory
104+
gc.collect()
105+
106+
107+
if __name__ == "__main__":
108+
sys.exit(pytest.main(["-v", __file__]))

0 commit comments

Comments
 (0)