|
16 | 16 | # under the License. |
17 | 17 | # pylint:disable=redefined-outer-name |
18 | 18 |
|
| 19 | +import contextlib |
19 | 20 | import math |
| 21 | +import os |
20 | 22 | import time |
21 | 23 | import uuid |
| 24 | +import warnings |
| 25 | +from collections.abc import Iterator |
22 | 26 | from datetime import datetime, timedelta |
23 | 27 | from pathlib import PosixPath |
24 | 28 | from typing import Any |
@@ -1272,3 +1276,114 @@ def test_scan_source_field_missing_in_spec(catalog: Catalog, spark: SparkSession |
1272 | 1276 |
|
1273 | 1277 | table = catalog.load_table(identifier) |
1274 | 1278 | assert len(list(table.scan().plan_files())) == 3 |
| 1279 | + |
| 1280 | + |
| 1281 | +HAS_PYICEBERG_CORE = False |
| 1282 | +try: |
| 1283 | + import importlib.util # noqa: E402 |
| 1284 | + |
| 1285 | + if importlib.util.find_spec("pyiceberg_core") is not None: |
| 1286 | + import pyiceberg_core.scan # noqa: F401 |
| 1287 | + |
| 1288 | + HAS_PYICEBERG_CORE = True |
| 1289 | +except (ImportError, NotImplementedError): |
| 1290 | + pass |
| 1291 | + |
| 1292 | +requires_pyiceberg_core = pytest.mark.skipif( |
| 1293 | + not HAS_PYICEBERG_CORE, reason="pyiceberg-core is not installed or lacks native scan bindings" |
| 1294 | +) |
| 1295 | + |
| 1296 | + |
| 1297 | +@contextlib.contextmanager |
| 1298 | +def env_var(key: str, value: str) -> Iterator[None]: |
| 1299 | + old_value = os.environ.get(key) |
| 1300 | + os.environ[key] = value |
| 1301 | + try: |
| 1302 | + yield |
| 1303 | + finally: |
| 1304 | + if old_value is None: |
| 1305 | + del os.environ[key] |
| 1306 | + else: |
| 1307 | + os.environ[key] = old_value |
| 1308 | + |
| 1309 | + |
| 1310 | +def assert_no_native_scan_fallback(caught_warnings: list[warnings.WarningMessage]) -> None: |
| 1311 | + fallback_warnings = [ |
| 1312 | + warning |
| 1313 | + for warning in caught_warnings |
| 1314 | + if "Falling back to PyArrow scan because pyiceberg-core cannot handle this scan" in str(warning.message) |
| 1315 | + ] |
| 1316 | + assert fallback_warnings == [] |
| 1317 | + |
| 1318 | + |
| 1319 | +def assert_same_column_names(left: pa.Table, right: pa.Table) -> None: |
| 1320 | + assert left.schema.names == right.schema.names |
| 1321 | + |
| 1322 | + |
| 1323 | +def assert_same_rows(left: pa.Table, right: pa.Table) -> None: |
| 1324 | + assert sorted(left.to_pylist(), key=repr) == sorted(right.to_pylist(), key=repr) |
| 1325 | + |
| 1326 | + |
| 1327 | +def assert_native_scan_eligible(scan: Any) -> None: |
| 1328 | + from pyiceberg.io.pyiceberg_core import can_read_projected_schema_with_pyiceberg_core |
| 1329 | + |
| 1330 | + assert can_read_projected_schema_with_pyiceberg_core( |
| 1331 | + scan.table_metadata.schema(), |
| 1332 | + scan.projection(), |
| 1333 | + scan.row_filter, |
| 1334 | + scan.case_sensitive, |
| 1335 | + ) |
| 1336 | + |
| 1337 | + |
| 1338 | +def assert_scan_has_delete_files(scan: Any) -> None: |
| 1339 | + assert any(task.delete_files for task in scan.plan_files()) |
| 1340 | + |
| 1341 | + |
| 1342 | +@pytest.mark.integration |
| 1343 | +@requires_pyiceberg_core |
| 1344 | +@pytest.mark.parametrize("catalog", [lf("session_catalog_hive"), lf("session_catalog")]) |
| 1345 | +def test_native_arrow_scan_comparisons(catalog: Catalog) -> None: |
| 1346 | + # 1. Unpartitioned table |
| 1347 | + table_limit = catalog.load_table("default.test_limit") |
| 1348 | + |
| 1349 | + # 2. Partitioned table |
| 1350 | + table_partitioned = catalog.load_table("default.test_partitioned_by_hours") |
| 1351 | + |
| 1352 | + # 3. Table with positional deletes |
| 1353 | + table_deletes = catalog.load_table("default.test_positional_mor_deletes_v2") |
| 1354 | + |
| 1355 | + scans = [ |
| 1356 | + # Unpartitioned table scans |
| 1357 | + (table_limit.scan(), False), |
| 1358 | + (table_limit.scan(selected_fields=("idx",)), False), |
| 1359 | + (table_limit.scan(row_filter="idx > 5", selected_fields=("idx",)), False), |
| 1360 | + (table_limit.scan(limit=3), False), |
| 1361 | + (table_limit.scan(limit=0), False), |
| 1362 | + (table_limit.scan(limit=999), False), |
| 1363 | + # Partitioned table scans |
| 1364 | + (table_partitioned.scan(), False), |
| 1365 | + (table_partitioned.scan(selected_fields=("ts",)), False), |
| 1366 | + ] |
| 1367 | + scans.extend( |
| 1368 | + [ |
| 1369 | + (table_deletes.scan(), True), |
| 1370 | + (table_deletes.scan(row_filter="letter >= 'e'", limit=2), True), |
| 1371 | + ] |
| 1372 | + ) |
| 1373 | + |
| 1374 | + for scan, expect_delete_files in scans: |
| 1375 | + assert_native_scan_eligible(scan) |
| 1376 | + if expect_delete_files: |
| 1377 | + assert_scan_has_delete_files(scan) |
| 1378 | + |
| 1379 | + with env_var("PYICEBERG_RUST_ARROW_SCAN", "0"): |
| 1380 | + pyarrow_reader_table = scan.to_arrow_batch_reader().read_all() |
| 1381 | + |
| 1382 | + with env_var("PYICEBERG_RUST_ARROW_SCAN", "1"): |
| 1383 | + with warnings.catch_warnings(record=True) as caught_warnings: |
| 1384 | + warnings.simplefilter("always") |
| 1385 | + native_reader_table = scan.to_arrow_batch_reader().read_all() |
| 1386 | + |
| 1387 | + assert_no_native_scan_fallback(caught_warnings) |
| 1388 | + assert_same_column_names(pyarrow_reader_table, native_reader_table) |
| 1389 | + assert_same_rows(pyarrow_reader_table, native_reader_table) |
0 commit comments