diff --git a/python/pyarrow-stubs/pyarrow/_dataset.pyi b/python/pyarrow-stubs/pyarrow/_dataset.pyi new file mode 100644 index 000000000000..c8cd3d970890 --- /dev/null +++ b/python/pyarrow-stubs/pyarrow/_dataset.pyi @@ -0,0 +1,682 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import sys + +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing_extensions import Self +from collections.abc import Collection, Callable, Iterator, Iterable +from typing import ( + IO, + Any, + Generic, + Literal, + NamedTuple, + TypeVar, +) + +from _typeshed import StrPath + +from . import csv, _json, _parquet, lib +from ._fs import FileSelector, FileSystem, SupportedFileSystem +from ._stubs_typing import Indices, JoinType, Order +from .acero import ExecNodeOptions +from .compute import Expression +from .ipc import IpcWriteOptions, RecordBatchReader + + +class Dataset(lib._Weakrefable): + @property + def partition_expression(self) -> Expression: ... + + def replace_schema(self, schema: lib.Schema) -> Self: ... + + def get_fragments(self, filter: Expression | None = None): ... + + def scanner( + self, + columns: list[str] | dict[str, Expression] | None = None, + filter: Expression | None = None, + batch_size: int = ..., + batch_readahead: int = 16, + fragment_readahead: int = 4, + fragment_scan_options: FragmentScanOptions | None = None, + use_threads: bool = True, + cache_metadata: bool = True, + memory_pool: lib.MemoryPool | None = None, + ) -> Scanner: ... + + def to_batches( + self, + columns: list[str] | None = None, + filter: Expression | None = None, + batch_size: int = ..., + batch_readahead: int = 16, + fragment_readahead: int = 4, + fragment_scan_options: FragmentScanOptions | None = None, + use_threads: bool = True, + cache_metadata: bool = True, + memory_pool: lib.MemoryPool | None = None, + ) -> Iterator[lib.RecordBatch]: ... + + def to_table( + self, + columns: list[str] | dict[str, Expression] | None = None, + filter: Expression | None = None, + batch_size: int = ..., + batch_readahead: int = 16, + fragment_readahead: int = 4, + fragment_scan_options: FragmentScanOptions | None = None, + use_threads: bool = True, + cache_metadata: bool = True, + memory_pool: lib.MemoryPool | None = None, + ) -> lib.Table: ... + + def take( + self, + indices: Indices, + columns: list[str] | None = None, + filter: Expression | None = None, + batch_size: int = ..., + batch_readahead: int = 16, + fragment_readahead: int = 4, + fragment_scan_options: FragmentScanOptions | None = None, + use_threads: bool = True, + cache_metadata: bool = True, + memory_pool: lib.MemoryPool | None = None, + ) -> lib.Table: ... + + def head( + self, + num_rows: int, + columns: list[str] | None = None, + filter: Expression | None = None, + batch_size: int = ..., + batch_readahead: int = 16, + fragment_readahead: int = 4, + fragment_scan_options: FragmentScanOptions | None = None, + use_threads: bool = True, + cache_metadata: bool = True, + memory_pool: lib.MemoryPool | None = None, + ) -> lib.Table: ... + + def count_rows( + self, + filter: Expression | None = None, + batch_size: int = ..., + batch_readahead: int = 16, + fragment_readahead: int = 4, + fragment_scan_options: FragmentScanOptions | None = None, + use_threads: bool = True, + cache_metadata: bool = True, + memory_pool: lib.MemoryPool | None = None, + ) -> int: ... + + @property + def schema(self) -> lib.Schema: ... + + def filter(self, expression: Expression | None) -> Self: ... + + def sort_by(self, sorting: str | + list[tuple[str, Order]], **kwargs) -> InMemoryDataset: ... + + def join( + self, + right_dataset: Dataset, + keys: str | list[str], + right_keys: str | list[str] | None = None, + join_type: JoinType = "left outer", + left_suffix: str | None = None, + right_suffix: str | None = None, + coalesce_keys: bool = True, + use_threads: bool = True, + ) -> InMemoryDataset: ... + + def join_asof( + self, + right_dataset: Dataset, + on: str, + by: str | list[str], + tolerance: int, + right_on: str | list[str] | None = None, + right_by: str | list[str] | None = None, + ) -> InMemoryDataset: ... + + @property + def format(self) -> FileFormat: ... + + +class InMemoryDataset(Dataset): + def __init__( + self, + source: lib.Table + | lib.RecordBatch + | lib.RecordBatchReader + | Iterable[lib.RecordBatch] + | list[Any], + schema: lib.Schema | None = None, + ) -> None: ... + + +class UnionDataset(Dataset): + def __init__( + self, + schema: lib.Schema | None = None, + children: list[Dataset] | None = None, + ) -> None: ... + + @property + def children(self) -> list[Dataset]: ... + + +class FileSystemDataset(Dataset): + def __init__( + self, + fragments: list[Fragment], + schema: lib.Schema, + format: FileFormat, + filesystem: SupportedFileSystem | None = None, + root_partition: Expression | None = None, + ) -> None: ... + + @classmethod + def from_paths( + cls, + paths: list[str], + schema: lib.Schema | None = None, + format: FileFormat | None = None, + filesystem: SupportedFileSystem | None = None, + partitions: list[Expression] | None = None, + root_partition: Expression | None = None, + ) -> FileSystemDataset: ... + + @property + def filesystem(self) -> FileSystem: ... + @property + def partitioning(self) -> Partitioning | None: ... + + @property + def files(self) -> list[str]: ... + + +class FileWriteOptions(lib._Weakrefable): + @property + def format(self) -> FileFormat: ... + + +class FileFormat(lib._Weakrefable): + def inspect( + self, file: StrPath | IO, filesystem: SupportedFileSystem | None = None + ) -> lib.Schema: ... + + def make_fragment( + self, + file: StrPath | IO | lib.Buffer | lib.BufferReader, + filesystem: SupportedFileSystem | None = None, + partition_expression: Expression | None = None, + *, + file_size: int | None = None, + ) -> Fragment: ... + + def make_write_options(self) -> FileWriteOptions: ... + @property + def default_extname(self) -> str: ... + @property + def default_fragment_scan_options(self) -> FragmentScanOptions: ... + @default_fragment_scan_options.setter + def default_fragment_scan_options(self, options: FragmentScanOptions) -> None: ... + + +class Fragment(lib._Weakrefable): + def open(self) -> lib.NativeFile | lib.BufferReader: ... + @property + def path(self) -> str: ... + @property + def row_groups(self) -> list[int]: ... + + @property + def filesystem(self) -> SupportedFileSystem: ... + + @property + def physical_schema(self) -> lib.Schema: ... + + @property + def partition_expression(self) -> Expression: ... + + def scanner( + self, + schema: lib.Schema | None = None, + columns: list[str] | None = None, + filter: Expression | None = None, + batch_size: int = ..., + batch_readahead: int = 16, + fragment_readahead: int = 4, + fragment_scan_options: FragmentScanOptions | None = None, + use_threads: bool = True, + cache_metadata: bool = True, + memory_pool: lib.MemoryPool | None = None, + ) -> Scanner: ... + + def to_batches( + self, + schema: lib.Schema | None = None, + columns: list[str] | None = None, + filter: Expression | None = None, + batch_size: int = ..., + batch_readahead: int = 16, + fragment_readahead: int = 4, + fragment_scan_options: FragmentScanOptions | None = None, + use_threads: bool = True, + cache_metadata: bool = True, + memory_pool: lib.MemoryPool | None = None, + ) -> Iterator[lib.RecordBatch]: ... + + def to_table( + self, + schema: lib.Schema | None = None, + columns: list[str] | None = None, + filter: Expression | None = None, + batch_size: int = ..., + batch_readahead: int = 16, + fragment_readahead: int = 4, + fragment_scan_options: FragmentScanOptions | None = None, + use_threads: bool = True, + cache_metadata: bool = True, + memory_pool: lib.MemoryPool | None = None, + ) -> lib.Table: ... + + def take( + self, + indices: Indices, + columns: list[str] | None = None, + filter: Expression | None = None, + batch_size: int = ..., + batch_readahead: int = 16, + fragment_readahead: int = 4, + fragment_scan_options: FragmentScanOptions | None = None, + use_threads: bool = True, + cache_metadata: bool = True, + memory_pool: lib.MemoryPool | None = None, + ) -> lib.Table: ... + + def head( + self, + num_rows: int, + columns: list[str] | None = None, + filter: Expression | None = None, + batch_size: int = ..., + batch_readahead: int = 16, + fragment_readahead: int = 4, + fragment_scan_options: FragmentScanOptions | None = None, + use_threads: bool = True, + cache_metadata: bool = True, + memory_pool: lib.MemoryPool | None = None, + ) -> lib.Table: ... + + def count_rows( + self, + columns: list[str] | None = None, + filter: Expression | None = None, + batch_size: int = ..., + batch_readahead: int = 16, + fragment_readahead: int = 4, + fragment_scan_options: FragmentScanOptions | None = None, + use_threads: bool = True, + cache_metadata: bool = True, + memory_pool: lib.MemoryPool | None = None, + ) -> int: ... + + +class FileFragment(Fragment): + def open(self) -> lib.NativeFile: ... + + @property + def path(self) -> str: ... + + @property + def filesystem(self) -> FileSystem: ... + + @property + def buffer(self) -> lib.Buffer: ... + + @property + def format(self) -> FileFormat: ... + + +class FragmentScanOptions(lib._Weakrefable): + @property + def type_name(self) -> str: ... + + +class IpcFileWriteOptions(FileWriteOptions): + @property + def write_options(self) -> IpcWriteOptions: ... + @write_options.setter + def write_options(self, write_options: IpcWriteOptions) -> None: ... + + +class IpcFileFormat(FileFormat): + def equals(self, other: IpcFileFormat) -> bool: ... + def make_write_options(self, **kwargs) -> IpcFileWriteOptions: ... + @property + def default_extname(self) -> str: ... + + +class FeatherFileFormat(IpcFileFormat): + ... + + +class CsvFileFormat(FileFormat): + def __init__( + self, + parse_options: csv.ParseOptions | None = None, + default_fragment_scan_options: CsvFragmentScanOptions | None = None, + convert_options: csv.ConvertOptions | None = None, + read_options: csv.ReadOptions | None = None, + ) -> None: ... + def make_write_options( + self, **kwargs) -> CsvFileWriteOptions: ... # type: ignore[override] + + @property + def parse_options(self) -> csv.ParseOptions: ... + @parse_options.setter + def parse_options(self, parse_options: csv.ParseOptions) -> None: ... + def equals(self, other: CsvFileFormat) -> bool: ... + + +class CsvFragmentScanOptions(FragmentScanOptions): + convert_options: csv.ConvertOptions + read_options: csv.ReadOptions + + def __init__( + self, + convert_options: csv.ConvertOptions | None = None, + read_options: csv.ReadOptions | None = None, + ) -> None: ... + def equals(self, other: CsvFragmentScanOptions) -> bool: ... + + +class CsvFileWriteOptions(FileWriteOptions): + write_options: csv.WriteOptions + + +class JsonFileFormat(FileFormat): + def __init__( + self, + default_fragment_scan_options: JsonFragmentScanOptions | None = None, + parse_options: _json.ParseOptions | None = None, + read_options: _json.ReadOptions | None = None, + ) -> None: ... + def equals(self, other: JsonFileFormat) -> bool: ... + + +class JsonFragmentScanOptions(FragmentScanOptions): + parse_options: _json.ParseOptions + read_options: _json.ReadOptions + + def __init__( + self, + parse_options: _json.ParseOptions | None = None, + read_options: _json.ReadOptions | None = None, + ) -> None: ... + def equals(self, other: JsonFragmentScanOptions) -> bool: ... + + +class Partitioning(lib._Weakrefable): + def parse(self, path: str) -> Expression: ... + + def format(self, expr: Expression) -> tuple[str, str]: ... + + @property + def schema(self) -> lib.Schema: ... + + @property + def dictionaries(self) -> list[Any]: ... + + +class PartitioningFactory(lib._Weakrefable): + @property + def type_name(self) -> str: ... + + +class KeyValuePartitioning(Partitioning): + @property + def dictionaries(self) -> list[Any]: ... + + +class DirectoryPartitioning(KeyValuePartitioning): + @staticmethod + def discover( + field_names: list[str] | None = None, + infer_dictionary: bool = False, + max_partition_dictionary_size: int = 0, + schema: lib.Schema | None = None, + segment_encoding: Literal["uri", "none"] = "uri", + ) -> PartitioningFactory: ... + + def __init__( + self, + schema: lib.Schema, + dictionaries: dict[str, lib.Array] | None = None, + segment_encoding: Literal["uri", "none"] = "uri", + ) -> None: ... + + +class HivePartitioning(KeyValuePartitioning): + def __init__( + self, + schema: lib.Schema, + dictionaries: dict[str, lib.Array] | None = None, + null_fallback: str = "__HIVE_DEFAULT_PARTITION__", + segment_encoding: Literal["uri", "none"] = "uri", + ) -> None: ... + + @staticmethod + def discover( + infer_dictionary: bool = False, + max_partition_dictionary_size: int = 0, + null_fallback="__HIVE_DEFAULT_PARTITION__", + schema: lib.Schema | None = None, + segment_encoding: Literal["uri", "none"] = "uri", + ) -> PartitioningFactory: ... + + +class FilenamePartitioning(KeyValuePartitioning): + def __init__( + self, + schema: lib.Schema, + dictionaries: dict[str, lib.Array] | None = None, + segment_encoding: Literal["uri", "none"] = "uri", + ) -> None: ... + + @staticmethod + def discover( + field_names: list[str] | None = None, + infer_dictionary: bool = False, + schema: lib.Schema | None = None, + segment_encoding: Literal["uri", "none"] = "uri", + ) -> PartitioningFactory: ... + + +class DatasetFactory(lib._Weakrefable): + root_partition: Expression + def finish(self, schema: lib.Schema | None = None) -> Dataset: ... + + def inspect( + self, + *, + promote_options: str = "default", + fragments: list[Fragment] | int | str | None = None, + ) -> lib.Schema: ... + + def inspect_schemas(self) -> list[lib.Schema]: ... + + +class FileSystemFactoryOptions(lib._Weakrefable): + partitioning: Partitioning + partitioning_factory: PartitioningFactory + partition_base_dir: str + exclude_invalid_files: bool + selector_ignore_prefixes: list[str] + + def __init__( + self, + partition_base_dir: str | None = None, + partitioning: Partitioning | PartitioningFactory | None = None, + exclude_invalid_files: bool | None = True, + selector_ignore_prefixes: list[str] | None = None, + ) -> None: ... + + +class FileSystemDatasetFactory(DatasetFactory): + def __init__( + self, + filesystem: SupportedFileSystem, + paths_or_selector: Collection[str] | FileSelector, + format: FileFormat, + options: FileSystemFactoryOptions | None = None, + ) -> None: ... + + +class UnionDatasetFactory(DatasetFactory): + def __init__(self, factories: list[DatasetFactory]) -> None: ... + + +_RecordBatchT = TypeVar("_RecordBatchT", bound=lib.RecordBatch) + + +class RecordBatchIterator(lib._Weakrefable, Generic[_RecordBatchT]): + def __iter__(self) -> Self: ... + def __next__(self) -> _RecordBatchT: ... + + +class TaggedRecordBatch(NamedTuple): + record_batch: lib.RecordBatch + fragment: Fragment + + +class TaggedRecordBatchIterator(lib._Weakrefable): + def __iter__(self) -> Self: ... + def __next__(self) -> TaggedRecordBatch: ... + + +class Scanner(lib._Weakrefable): + @staticmethod + def from_dataset( + dataset: Dataset, + *, + columns: list[str] | dict[str, Expression] | None = None, + filter: Expression | None = None, + batch_size: int = ..., + batch_readahead: int = 16, + fragment_readahead: int = 4, + fragment_scan_options: FragmentScanOptions | None = None, + use_threads: bool = True, + cache_metadata: bool = True, + memory_pool: lib.MemoryPool | None = None, + ) -> Scanner: ... + + @staticmethod + def from_fragment( + fragment: Fragment, + *, + schema: lib.Schema | None = None, + columns: list[str] | dict[str, Expression] | None = None, + filter: Expression | None = None, + batch_size: int = ..., + batch_readahead: int = 16, + fragment_readahead: int = 4, + fragment_scan_options: FragmentScanOptions | None = None, + use_threads: bool = True, + cache_metadata: bool = True, + memory_pool: lib.MemoryPool | None = None, + ) -> Scanner: ... + + @staticmethod + def from_batches( + source: Iterator[lib.RecordBatch] | RecordBatchReader | Any, + *, + schema: lib.Schema | None = None, + columns: list[str] | dict[str, Expression] | None = None, + filter: Expression | None = None, + batch_size: int = ..., + batch_readahead: int = 16, + fragment_readahead: int = 4, + fragment_scan_options: FragmentScanOptions | None = None, + use_threads: bool = True, + cache_metadata: bool = True, + memory_pool: lib.MemoryPool | None = None, + ) -> Scanner: ... + + @property + def dataset_schema(self) -> lib.Schema: ... + + @property + def projected_schema(self) -> lib.Schema: ... + + def to_batches(self) -> Iterator[lib.RecordBatch]: ... + + def scan_batches(self) -> TaggedRecordBatchIterator: ... + + def to_table(self) -> lib.Table: ... + + def take(self, indices: Indices) -> lib.Table: ... + + def head(self, num_rows: int) -> lib.Table: ... + + def count_rows(self) -> int: ... + + def to_reader(self) -> RecordBatchReader: ... + + +def get_partition_keys(partition_expression: Expression) -> dict[str, Any]: ... + + +class WrittenFile(lib._Weakrefable): + def __init__(self, path: str, metadata: _parquet.FileMetaData | + None, size: int) -> None: ... + + +def _filesystemdataset_write( + data: Scanner, + base_dir: StrPath, + basename_template: str, + filesystem: SupportedFileSystem, + partitioning: Partitioning, + preserve_order: bool, + file_options: FileWriteOptions, + max_partitions: int, + file_visitor: Callable[[str], None] | None, + existing_data_behavior: Literal["error", "overwrite_or_ignore", "delete_matching"], + max_open_files: int, + max_rows_per_file: int, + min_rows_per_group: int, + max_rows_per_group: int, + create_dir: bool, +): ... + + +class _ScanNodeOptions(ExecNodeOptions): + def _set_options(self, dataset: Dataset, scan_options: dict) -> None: ... + + +class ScanNodeOptions(_ScanNodeOptions): + def __init__( + self, dataset: Dataset, require_sequenced_output: bool = False, **kwargs + ) -> None: ... diff --git a/python/pyarrow-stubs/pyarrow/_dataset_orc.pyi b/python/pyarrow-stubs/pyarrow/_dataset_orc.pyi new file mode 100644 index 000000000000..62f49bf5d301 --- /dev/null +++ b/python/pyarrow-stubs/pyarrow/_dataset_orc.pyi @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from ._dataset import FileFormat + + +class OrcFileFormat(FileFormat): + def equals(self, other: OrcFileFormat) -> bool: ... + @property + def default_extname(self): ... diff --git a/python/pyarrow-stubs/pyarrow/_dataset_parquet.pyi b/python/pyarrow-stubs/pyarrow/_dataset_parquet.pyi new file mode 100644 index 000000000000..6c27e3c8a93e --- /dev/null +++ b/python/pyarrow-stubs/pyarrow/_dataset_parquet.pyi @@ -0,0 +1,200 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from collections.abc import Iterable +from dataclasses import dataclass +from typing import IO, Any, TypedDict + +from _typeshed import StrPath + +from ._compute import Expression +from ._dataset import ( + DatasetFactory, + FileFormat, + FileFragment, + FileWriteOptions, + Fragment, + FragmentScanOptions, + Partitioning, + PartitioningFactory, +) +from ._dataset_parquet_encryption import ParquetDecryptionConfig +from ._fs import SupportedFileSystem +from ._parquet import FileDecryptionProperties, FileMetaData +from ._types import DataType, LargeListType, ListType +from .lib import CacheOptions, Schema, _Weakrefable, NativeFile, Buffer, BufferReader + +parquet_encryption_enabled: bool + + +class ParquetFileFormat(FileFormat): + def __init__( + self, + read_options: ParquetReadOptions | None = None, + default_fragment_scan_options: ParquetFragmentScanOptions | None = None, + *, + pre_buffer: bool = True, + coerce_int96_timestamp_unit: str | None = None, + thrift_string_size_limit: int | None = None, + thrift_container_size_limit: int | None = None, + page_checksum_verification: bool = False, + arrow_extensions_enabled: bool = True, + binary_type: DataType | None = None, + list_type: type[ListType | LargeListType] | None = None, + use_buffered_stream: bool = False, + buffer_size: int = 8192, + dictionary_columns: list[str] | set[str] | None = None, + decryption_properties: FileDecryptionProperties | None = None, + ) -> None: ... + @property + def read_options(self) -> ParquetReadOptions: ... + def make_write_options( + self, **kwargs) -> ParquetFileWriteOptions: ... # type: ignore[override] + + def equals(self, other: ParquetFileFormat) -> bool: ... + @property + def default_extname(self) -> str: ... + + def make_fragment( + self, + file: StrPath | IO | Buffer | BufferReader, + filesystem: SupportedFileSystem | None = None, + partition_expression: Expression | None = None, + row_groups: Iterable[int] | None = None, + *, + file_size: int | None = None, + ) -> Fragment: ... + + +class _NameStats(TypedDict): + min: Any + max: Any + + +class RowGroupInfo: + id: int + metadata: FileMetaData + schema: Schema + + def __init__(self, id: int, metadata: FileMetaData, schema: Schema) -> None: ... + @property + def num_rows(self) -> int: ... + @property + def total_byte_size(self) -> int: ... + @property + def statistics(self) -> dict[str, _NameStats]: ... + + +class ParquetFileFragment(FileFragment): + def ensure_complete_metadata(self) -> None: ... + @property + def path(self) -> str: ... + @property + def filesystem(self) -> SupportedFileSystem: ... + def open(self) -> NativeFile: ... + + @property + def row_groups(self) -> list[int]: ... + @property + def metadata(self) -> FileMetaData: ... + @property + def num_row_groups(self) -> int: ... + + def split_by_row_group( + self, filter: Expression | None = None, schema: Schema | None = None + ) -> list[Fragment]: ... + + def subset( + self, + filter: Expression | None = None, + schema: Schema | None = None, + row_group_ids: list[int] | None = None, + ) -> ParquetFileFormat: ... + + +class ParquetReadOptions(_Weakrefable): + def __init__( + self, + dictionary_columns: list[str] | set[str] | None = None, + coerce_int96_timestamp_unit: str | None = None, + binary_type: DataType | None = None, + list_type: type[ListType | LargeListType] | None = None, + ) -> None: ... + + @property + def dictionary_columns(self) -> set[str]: ... + @dictionary_columns.setter + def dictionary_columns(self, columns: list[str] | set[str]) -> None: ... + + @property + def coerce_int96_timestamp_unit(self) -> str: ... + @coerce_int96_timestamp_unit.setter + def coerce_int96_timestamp_unit(self, unit: str) -> None: ... + + @property + def binary_type(self) -> DataType: ... + @binary_type.setter + def binary_type(self, type: DataType | None) -> None: ... + + @property + def list_type(self) -> type[ListType | LargeListType]: ... + @list_type.setter + def list_type(self, type: type[ListType | LargeListType] | None) -> None: ... + + def equals(self, other: ParquetReadOptions) -> bool: ... + + +class ParquetFileWriteOptions(FileWriteOptions): + def update(self, **kwargs) -> None: ... + def _set_properties(self) -> None: ... + def _set_arrow_properties(self) -> None: ... + def _set_encryption_config(self) -> None: ... + # accept passthrough options used in tests + def __init__(self, **kwargs) -> None: ... + + +@dataclass(kw_only=True) +class ParquetFragmentScanOptions(FragmentScanOptions): + use_buffered_stream: bool = False + buffer_size: int = 8192 + pre_buffer: bool = True + cache_options: CacheOptions | None = None + thrift_string_size_limit: int | None = None + thrift_container_size_limit: int | None = None + decryption_config: ParquetDecryptionConfig | None = None + decryption_properties: FileDecryptionProperties | None = None + page_checksum_verification: bool = False + + def equals(self, other: ParquetFragmentScanOptions) -> bool: ... + + +@dataclass +class ParquetFactoryOptions(_Weakrefable): + + partition_base_dir: str | None = None + partitioning: Partitioning | PartitioningFactory | None = None + validate_column_chunk_paths: bool = False + + +class ParquetDatasetFactory(DatasetFactory): + def __init__( + self, + metadata_path: str, + filesystem: SupportedFileSystem, + format: FileFormat, + options: ParquetFactoryOptions | None = None, + ) -> None: ... diff --git a/python/pyarrow-stubs/pyarrow/_dataset_parquet_encryption.pyi b/python/pyarrow-stubs/pyarrow/_dataset_parquet_encryption.pyi new file mode 100644 index 000000000000..b36f18522e5e --- /dev/null +++ b/python/pyarrow-stubs/pyarrow/_dataset_parquet_encryption.pyi @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from ._dataset_parquet import ParquetFileWriteOptions, ParquetFragmentScanOptions +from ._parquet import FileDecryptionProperties +from ._parquet_encryption import (CryptoFactory, EncryptionConfiguration, + DecryptionConfiguration, KmsConnectionConfig) +from .lib import _Weakrefable + + +class ParquetEncryptionConfig(_Weakrefable): + def __init__( + self, + crypto_factory: CryptoFactory, + kms_connection_config: KmsConnectionConfig, + encryption_config: EncryptionConfiguration, + ) -> None: ... + + +class ParquetDecryptionConfig(_Weakrefable): + def __init__( + self, + crypto_factory: CryptoFactory, + kms_connection_config: KmsConnectionConfig, + decryption_config: DecryptionConfiguration, + ) -> None: ... + + +def set_encryption_config( + opts: ParquetFileWriteOptions, + config: ParquetEncryptionConfig, +) -> None: ... + + +def set_decryption_properties( + opts: ParquetFragmentScanOptions, + config: FileDecryptionProperties, +): ... + + +def set_decryption_config( + opts: ParquetFragmentScanOptions, + config: ParquetDecryptionConfig, +): ... diff --git a/python/pyarrow-stubs/pyarrow/_parquet.pyi b/python/pyarrow-stubs/pyarrow/_parquet.pyi new file mode 100644 index 000000000000..2521936ad5c5 --- /dev/null +++ b/python/pyarrow-stubs/pyarrow/_parquet.pyi @@ -0,0 +1,524 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from collections.abc import Iterable, Iterator, Sequence +from typing import IO, Any, Literal, TypeAlias, TypedDict + +from _typeshed import StrPath + +from ._stubs_typing import Order +from .lib import ( + Buffer, + ChunkedArray, + KeyValueMetadata, + MemoryPool, + NativeFile, + RecordBatch, + Schema, + Table, + _Weakrefable, + DataType, + ListType, + LargeListType +) + +_PhysicalType: TypeAlias = Literal[ + "BOOLEAN", + "INT32", + "INT64", + "INT96", + "FLOAT", + "DOUBLE", + "BYTE_ARRAY", + "FIXED_LEN_BYTE_ARRAY", + "UNKNOWN", +] +_LogicTypeName: TypeAlias = Literal[ + "UNDEFINED", + "STRING", + "MAP", + "LIST", + "ENUM", + "DECIMAL", + "DATE", + "TIME", + "TIMESTAMP", + "INT", + "FLOAT16", + "JSON", + "BSON", + "UUID", + "NONE", + "UNKNOWN", +] +_ConvertedType: TypeAlias = Literal[ + "NONE", + "UTF8", + "MAP", + "MAP_KEY_VALUE", + "LIST", + "ENUM", + "DECIMAL", + "DATE", + "TIME_MILLIS", + "TIME_MICROS", + "TIMESTAMP_MILLIS", + "TIMESTAMP_MICROS", + "UINT_8", + "UINT_16", + "UINT_32", + "UINT_64", + "INT_8", + "INT_16", + "INT_32", + "INT_64", + "JSON", + "BSON", + "INTERVAL", + "UNKNOWN", +] +_Encoding: TypeAlias = Literal[ + "PLAIN", + "PLAIN_DICTIONARY", + "RLE", + "BIT_PACKED", + "DELTA_BINARY_PACKED", + "DELTA_LENGTH_BYTE_ARRAY", + "DELTA_BYTE_ARRAY", + "RLE_DICTIONARY", + "BYTE_STREAM_SPLIT", + "UNKNOWN", +] +_Compression: TypeAlias = Literal[ + "UNCOMPRESSED", + "SNAPPY", + "GZIP", + "LZO", + "BROTLI", + "LZ4", + "ZSTD", + "UNKNOWN", +] + + +class _Statistics(TypedDict): + has_min_max: bool + min: Any | None + max: Any | None + null_count: int | None + distinct_count: int | None + num_values: int + physical_type: _PhysicalType + + +class Statistics(_Weakrefable): + def to_dict(self) -> _Statistics: ... + def equals(self, other: Statistics) -> bool: ... + @property + def has_min_max(self) -> bool: ... + @property + def has_null_count(self) -> bool: ... + @property + def has_distinct_count(self) -> bool: ... + @property + def min_raw(self) -> Any | None: ... + @property + def max_raw(self) -> Any | None: ... + @property + def min(self) -> Any | None: ... + @property + def max(self) -> Any | None: ... + @property + def null_count(self) -> int | None: ... + @property + def distinct_count(self) -> int | None: ... + @property + def num_values(self) -> int: ... + @property + def physical_type(self) -> _PhysicalType: ... + @property + def logical_type(self) -> ParquetLogicalType: ... + @property + def converted_type(self) -> _ConvertedType | None: ... + @property + def is_min_exact(self) -> bool: ... + @property + def is_max_exact(self) -> bool: ... + + +class ParquetLogicalType(_Weakrefable): + def to_json(self) -> str: ... + @property + def type(self) -> _LogicTypeName: ... + + +class _ColumnChunkMetaData(TypedDict): + file_offset: int + file_path: str | None + physical_type: _PhysicalType + num_values: int + path_in_schema: str + is_stats_set: bool + statistics: Statistics | None + compression: _Compression + encodings: tuple[_Encoding, ...] + has_dictionary_page: bool + dictionary_page_offset: int | None + data_page_offset: int + total_compressed_size: int + total_uncompressed_size: int + + +class ColumnChunkMetaData(_Weakrefable): + def to_dict(self) -> _ColumnChunkMetaData: ... + def equals(self, other: ColumnChunkMetaData) -> bool: ... + @property + def file_offset(self) -> int: ... + @property + def file_path(self) -> str | None: ... + @property + def physical_type(self) -> _PhysicalType: ... + @property + def num_values(self) -> int: ... + @property + def path_in_schema(self) -> str: ... + @property + def is_stats_set(self) -> bool: ... + @property + def statistics(self) -> Statistics | None: ... + @property + def compression(self) -> _Compression: ... + @property + def encodings(self) -> tuple[_Encoding, ...]: ... + @property + def has_dictionary_page(self) -> bool: ... + @property + def dictionary_page_offset(self) -> int | None: ... + @property + def data_page_offset(self) -> int: ... + @property + def has_index_page(self) -> bool: ... + @property + def index_page_offset(self) -> int: ... + @property + def total_compressed_size(self) -> int: ... + @property + def total_uncompressed_size(self) -> int: ... + @property + def has_offset_index(self) -> bool: ... + @property + def has_column_index(self) -> bool: ... + @property + def metadata(self) -> dict[bytes, bytes] | None: ... + @property + def name(self) -> str: ... + @property + def max_definition_level(self) -> int: ... + @property + def max_repetition_level(self) -> int: ... + @property + def converted_type(self) -> _ConvertedType: ... + @property + def logical_type(self) -> ParquetLogicalType: ... + + +class _SortingColumn(TypedDict): + column_index: int + descending: bool + nulls_first: bool + + +class SortingColumn: + def __init__( + self, column_index: int, descending: bool = False, nulls_first: bool = False + ) -> None: ... + + @classmethod + def from_ordering( + cls, + schema: Schema, + sort_keys: Sequence[str] + | Sequence[tuple[str, Order]] + | Sequence[str | tuple[str, Order]], + null_placement: Literal["at_start", "at_end"] = "at_end", + ) -> tuple[SortingColumn, ...]: ... + + @staticmethod + def to_ordering( + schema: Schema, sorting_columns: tuple[SortingColumn, ...] | list[SortingColumn] + ) -> tuple[Sequence[tuple[str, Order]], Literal["at_start", "at_end"]]: ... + def __hash__(self) -> int: ... + @property + def column_index(self) -> int: ... + @property + def descending(self) -> bool: ... + @property + def nulls_first(self) -> bool: ... + def to_dict(self) -> _SortingColumn: ... + + +class _RowGroupMetaData(TypedDict): + num_columns: int + num_rows: int + total_byte_size: int + columns: list[ColumnChunkMetaData] + sorting_columns: list[SortingColumn] + + +class RowGroupMetaData(_Weakrefable): + def __init__(self, parent: FileMetaData, index: int) -> None: ... + def equals(self, other: RowGroupMetaData) -> bool: ... + def column(self, i: int) -> ColumnChunkMetaData: ... + def to_dict(self) -> _RowGroupMetaData: ... + @property + def num_columns(self) -> int: ... + @property + def num_rows(self) -> int: ... + @property + def total_byte_size(self) -> int: ... + @property + def sorting_columns(self) -> list[SortingColumn]: ... + + +class _FileMetaData(TypedDict): + created_by: str + num_columns: int + num_rows: int + num_row_groups: int + format_version: str + serialized_size: int + row_groups: list[Any] # List of row group metadata dictionaries + + +class FileMetaData(_Weakrefable): + def __hash__(self) -> int: ... + def to_dict(self) -> _FileMetaData: ... + def equals(self, other: FileMetaData) -> bool: ... + @property + def schema(self) -> ParquetSchema: ... + @property + def serialized_size(self) -> int: ... + @property + def num_columns(self) -> int: ... + @property + def num_rows(self) -> int: ... + @property + def num_row_groups(self) -> int: ... + @property + def format_version(self) -> str: ... + @property + def created_by(self) -> str: ... + @property + def metadata(self) -> dict[bytes, bytes] | None: ... + def row_group(self, i: int) -> RowGroupMetaData: ... + def set_file_path(self, path: str) -> None: ... + def append_row_groups(self, other: FileMetaData) -> None: ... + def write_metadata_file(self, where: StrPath | Buffer | + NativeFile | IO) -> None: ... + + +class ParquetSchema(_Weakrefable): + def __init__(self, container: FileMetaData) -> None: ... + def __getitem__(self, i: int) -> ColumnSchema: ... + def __hash__(self) -> int: ... + def __len__(self) -> int: ... + @property + def names(self) -> list[str]: ... + def to_arrow_schema(self) -> Schema: ... + def equals(self, other: ParquetSchema) -> bool: ... + def column(self, i: int) -> ColumnSchema: ... + + +class ColumnSchema(_Weakrefable): + def __init__(self, schema: ParquetSchema, index: int) -> None: ... + def equals(self, other: ColumnSchema) -> bool: ... + @property + def name(self) -> str: ... + @property + def path(self) -> str: ... + @property + def max_definition_level(self) -> int: ... + @property + def max_repetition_level(self) -> int: ... + @property + def physical_type(self) -> _PhysicalType: ... + @property + def logical_type(self) -> ParquetLogicalType: ... + @property + def converted_type(self) -> _ConvertedType | None: ... + @property + def length(self) -> int | None: ... + @property + def precision(self) -> int | None: ... + @property + def scale(self) -> int | None: ... + + +class ParquetReader(_Weakrefable): + def __init__(self, memory_pool: MemoryPool | None = None) -> None: ... + + def open( + self, + source: StrPath | Buffer | NativeFile | IO, + *, + use_memory_map: bool = False, + read_dictionary: Iterable[int] | Iterable[str] | None = None, + metadata: FileMetaData | None = None, + binary_type: DataType | None = None, + list_type: ListType | LargeListType | None = None, + buffer_size: int = 0, + pre_buffer: bool = False, + coerce_int96_timestamp_unit: str | None = None, + decryption_properties: FileDecryptionProperties | None = None, + thrift_string_size_limit: int | None = None, + thrift_container_size_limit: int | None = None, + page_checksum_verification: bool = False, + arrow_extensions_enabled: bool | None = None, + ) -> None: ... + + @property + def column_paths(self) -> list[str]: ... + @property + def metadata(self) -> FileMetaData: ... + @property + def schema_arrow(self) -> Schema: ... + @property + def num_row_groups(self) -> int: ... + def set_use_threads(self, use_threads: bool) -> None: ... + def set_batch_size(self, batch_size: int) -> None: ... + + def iter_batches( + self, + batch_size: int = 65536, + row_groups: list[int] | range | None = None, + column_indices: list[str] | list[int] | None = None, + use_threads: bool = True, + use_pandas_metadata: bool = False, + ) -> Iterator[RecordBatch]: ... + + def read_row_group( + self, i: int, column_indices: list[int] | None = None, use_threads: bool = True + ) -> Table: ... + + def read_row_groups( + self, + row_groups: Sequence[int] | range, + column_indices: list[str] | list[int] | None = None, + use_threads: bool = True, + use_pandas_metadata: bool = False, + ) -> Table: ... + + def read_all( + self, column_indices: list[int] | None = None, use_threads: bool = True + ) -> Table: ... + + def scan_contents( + self, columns: Sequence[str] | Sequence[int] | None = None, + batch_size: int = 65536 + ) -> int: ... + + def column_name_idx(self, column_name: str) -> int: ... + def read_column(self, column_index: int) -> ChunkedArray: ... + def close(self) -> None: ... + @property + def closed(self) -> bool: ... + + +class ParquetWriter(_Weakrefable): + def __init__( + self, + where: StrPath | NativeFile | IO, + schema: Schema, + use_dictionary: bool | list[str] | None = None, + compression: _Compression | dict[str, _Compression] | str | None = None, + version: str | None = None, + write_statistics: bool | list[str] | None = None, + memory_pool: MemoryPool | None = None, + use_deprecated_int96_timestamps: bool = False, + coerce_timestamps: Literal["ms", "us"] | None = None, + data_page_size: int | None = None, + allow_truncated_timestamps: bool = False, + compression_level: int | dict[str, int] | None = None, + use_byte_stream_split: bool | list[str] = False, + column_encoding: _Encoding | dict[str, _Encoding] | None = None, + writer_engine_version: str | None = None, + data_page_version: str | None = None, + use_compliant_nested_type: bool = True, + encryption_properties: FileDecryptionProperties | None = None, + write_batch_size: int | None = None, + dictionary_pagesize_limit: int | None = None, + store_schema: bool = True, + write_page_index: bool = False, + write_page_checksum: bool = False, + sorting_columns: tuple[SortingColumn, ...] | None = None, + store_decimal_as_integer: bool = False, + write_time_adjusted_to_utc: bool = False, + max_rows_per_page: int | None = None, + ): ... + def close(self) -> None: ... + def write_table(self, table: Table, row_group_size: int | None = None) -> None: ... + def add_key_value_metadata(self, key_value_metadata: KeyValueMetadata) -> None: ... + @property + def metadata(self) -> FileMetaData: ... + @property + def use_dictionary(self) -> bool | list[str] | None: ... + @property + def use_deprecated_int96_timestamps(self) -> bool: ... + @property + def use_byte_stream_split(self) -> bool | list[str]: ... + @property + def column_encoding(self) -> _Encoding | dict[str, _Encoding] | None: ... + @property + def coerce_timestamps(self) -> Literal["ms", "us"] | None: ... + @property + def allow_truncated_timestamps(self) -> bool: ... + @property + def compression(self) -> _Compression | dict[str, _Compression] | None: ... + @property + def compression_level(self) -> int | dict[str, int] | None: ... + @property + def data_page_version(self) -> str | None: ... + @property + def use_compliant_nested_type(self) -> bool: ... + @property + def version(self) -> str | None: ... + @property + def write_statistics(self) -> bool | list[str] | None: ... + @property + def writer_engine_version(self) -> str: ... + @property + def row_group_size(self) -> int: ... + @property + def data_page_size(self) -> int: ... + @property + def encryption_properties(self) -> FileDecryptionProperties: ... + @property + def write_batch_size(self) -> int: ... + @property + def dictionary_pagesize_limit(self) -> int: ... + @property + def store_schema(self) -> bool: ... + @property + def store_decimal_as_integer(self) -> bool: ... + + +class FileEncryptionProperties: + ... + + +class FileDecryptionProperties: + ... diff --git a/python/pyarrow-stubs/pyarrow/_parquet_encryption.pyi b/python/pyarrow-stubs/pyarrow/_parquet_encryption.pyi new file mode 100644 index 000000000000..74b50ce665d1 --- /dev/null +++ b/python/pyarrow-stubs/pyarrow/_parquet_encryption.pyi @@ -0,0 +1,141 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import datetime as dt +import pathlib + +from collections.abc import Callable + +from pyarrow._fs import FileSystem +from ._parquet import FileDecryptionProperties, FileEncryptionProperties +from .lib import _Weakrefable + + +class EncryptionConfiguration(_Weakrefable): + footer_key: str + column_keys: dict[str, list[str]] + encryption_algorithm: str + plaintext_footer: bool + double_wrapping: bool + cache_lifetime: dt.timedelta + internal_key_material: bool + data_key_length_bits: int + uniform_encryption: bool + + def __init__( + self, + footer_key: str, + *, + column_keys: dict[str, str | list[str]] | None = None, + encryption_algorithm: str | None = None, + plaintext_footer: bool | None = None, + double_wrapping: bool | None = None, + cache_lifetime: dt.timedelta | None = None, + internal_key_material: bool | None = None, + data_key_length_bits: int | None = None, + uniform_encryption: bool | None = None, + ) -> None: ... + + +class DecryptionConfiguration(_Weakrefable): + cache_lifetime: dt.timedelta + def __init__(self, *, cache_lifetime: dt.timedelta | None = None): ... + + +class KmsConnectionConfig(_Weakrefable): + kms_instance_id: str + kms_instance_url: str + key_access_token: str + custom_kms_conf: dict[str, str] + + def __init__( + self, + *, + kms_instance_id: str | None = None, + kms_instance_url: str | None = None, + key_access_token: str | None = None, + custom_kms_conf: dict[str, str] | None = None, + ) -> None: ... + def refresh_key_access_token(self, value: str) -> None: ... + + +class KmsClient(_Weakrefable): + def wrap_key(self, key_bytes: bytes, master_key_identifier: str) -> str: ... + def unwrap_key(self, wrapped_key: str, master_key_identifier: str) -> bytes: ... + + +class CryptoFactory(_Weakrefable): + def __init__(self, kms_client_factory: Callable[[ + KmsConnectionConfig], KmsClient]): ... + + def file_encryption_properties( + self, + kms_connection_config: KmsConnectionConfig, + encryption_config: EncryptionConfiguration, + ) -> FileEncryptionProperties: ... + + def file_decryption_properties( + self, + kms_connection_config: KmsConnectionConfig, + decryption_config: DecryptionConfiguration | None = None, + ) -> FileDecryptionProperties: ... + def remove_cache_entries_for_token(self, access_token: str) -> None: ... + def remove_cache_entries_for_all_tokens(self) -> None: ... + def rotate_master_keys( + self, + kms_connection_config: KmsConnectionConfig, + parquet_file_path: str | pathlib.Path, + filesystem: FileSystem | None = None, + double_wrapping: bool = True, + cache_lifetime_seconds: int | float = 600, + ) -> None: ... + + +class KeyMaterial(_Weakrefable): + @property + def is_footer_key(self) -> bool: ... + @property + def is_double_wrapped(self) -> bool: ... + @property + def master_key_id(self) -> str: ... + @property + def wrapped_dek(self) -> str: ... + @property + def kek_id(self) -> str: ... + @property + def wrapped_kek(self) -> str: ... + @property + def kms_instance_id(self) -> str: ... + @property + def kms_instance_url(self) -> str: ... + @staticmethod + def wrap(key_material: KeyMaterial) -> KeyMaterial: ... + @staticmethod + def parse(key_material_string: str) -> KeyMaterial: ... + + + +class FileSystemKeyMaterialStore(_Weakrefable): + def get_key_material(self, key_id: str) -> KeyMaterial: ... + def get_key_id_set(self) -> list[str]: ... + @classmethod + def for_file( + cls, + parquet_file_path: str | pathlib.Path, /, + filesystem: FileSystem | None = None + ) -> FileSystemKeyMaterialStore: + ... diff --git a/python/pyarrow-stubs/pyarrow/dataset.pyi b/python/pyarrow-stubs/pyarrow/dataset.pyi new file mode 100644 index 000000000000..66d86b14a259 --- /dev/null +++ b/python/pyarrow-stubs/pyarrow/dataset.pyi @@ -0,0 +1,199 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from collections.abc import Callable, Iterable, Sequence +from typing import Literal, TypeAlias, Any + +from _typeshed import StrPath +from pyarrow._dataset import ( + CsvFileFormat, + CsvFragmentScanOptions, + Dataset, + DatasetFactory, + DirectoryPartitioning, + FeatherFileFormat, + FileFormat, + FileFragment, + FilenamePartitioning, + FileSystemDataset, + FileSystemDatasetFactory, + FileSystemFactoryOptions, + FileWriteOptions, + Fragment, + FragmentScanOptions, + HivePartitioning, + InMemoryDataset, + IpcFileFormat, + IpcFileWriteOptions, + JsonFileFormat, + JsonFragmentScanOptions, + Partitioning, + PartitioningFactory, + Scanner, + TaggedRecordBatch, + UnionDataset, + UnionDatasetFactory, + WrittenFile, + get_partition_keys, +) +from pyarrow._dataset_orc import OrcFileFormat +from pyarrow._dataset_parquet import ( + ParquetDatasetFactory, + ParquetFactoryOptions, + ParquetFileFormat, + ParquetFileFragment, + ParquetFileWriteOptions, + ParquetFragmentScanOptions, + ParquetReadOptions, + RowGroupInfo, +) +from pyarrow._dataset_parquet_encryption import ( + ParquetDecryptionConfig, + ParquetEncryptionConfig, +) +from pyarrow.compute import Expression, field, scalar +from pyarrow.lib import Array, RecordBatch, RecordBatchReader, Schema, Table + +from ._fs import SupportedFileSystem + +_orc_available: bool +_parquet_available: bool + +__all__ = [ + "CsvFileFormat", + "CsvFragmentScanOptions", + "Dataset", + "DatasetFactory", + "DirectoryPartitioning", + "FeatherFileFormat", + "FileFormat", + "FileFragment", + "FilenamePartitioning", + "FileSystemDataset", + "FileSystemDatasetFactory", + "FileSystemFactoryOptions", + "FileWriteOptions", + "Fragment", + "FragmentScanOptions", + "HivePartitioning", + "InMemoryDataset", + "IpcFileFormat", + "IpcFileWriteOptions", + "JsonFileFormat", + "JsonFragmentScanOptions", + "Partitioning", + "PartitioningFactory", + "Scanner", + "TaggedRecordBatch", + "UnionDataset", + "UnionDatasetFactory", + "WrittenFile", + "get_partition_keys", + # Orc + "OrcFileFormat", + # Parquet + "ParquetDatasetFactory", + "ParquetFactoryOptions", + "ParquetFileFormat", + "ParquetFileFragment", + "ParquetFileWriteOptions", + "ParquetFragmentScanOptions", + "ParquetReadOptions", + "RowGroupInfo", + # Parquet Encryption + "ParquetDecryptionConfig", + "ParquetEncryptionConfig", + # Compute + "Expression", + "field", + "scalar", + # Dataset + "partitioning", + "parquet_dataset", + "write_dataset", +] + +_DatasetFormat: TypeAlias = ( + Literal["parquet", "ipc", "arrow", "feather", "csv", "json", "orc", str] +) + + +def partitioning( + schema: Schema = None, + *, + field_names: list[str] = None, + flavor: Literal["hive"] = None, + dictionaries: dict[str, Array] | Literal["infer"] | None = None, +) -> Partitioning | PartitioningFactory: ... + + +def parquet_dataset( + metadata_path: StrPath, + schema: Schema | None = None, + filesystem: SupportedFileSystem | None = None, + format: ParquetFileFormat | None = None, + partitioning: Partitioning | PartitioningFactory | str | None = None, + partition_base_dir: str | None = None, +) -> FileSystemDataset: ... + + +def dataset( + source: StrPath + | Sequence[Dataset] + | Sequence[StrPath] + | Iterable[RecordBatch] + | Iterable[Table] + | RecordBatchReader + | RecordBatch + | Table, + schema: Schema | None = None, + format: FileFormat | _DatasetFormat | None = None, + filesystem: SupportedFileSystem | str | None = None, + partitioning: Partitioning | PartitioningFactory | str | list[str] | None = None, + partition_base_dir: str | None = None, + exclude_invalid_files: bool | None = None, + ignore_prefixes: list[str] | None = None, +) -> FileSystemDataset | UnionDataset | InMemoryDataset | Dataset: ... + + +def write_dataset( + data: Any | Dataset | Table | RecordBatch | RecordBatchReader | list[Table] + | Iterable[RecordBatch] | Scanner, + base_dir: StrPath, + *, + basename_template: str | None = None, + format: FileFormat | _DatasetFormat | None = None, + partitioning: Partitioning | PartitioningFactory | list[str] | None = None, + partitioning_flavor: str | None = None, + schema: Schema | None = None, + filesystem: SupportedFileSystem | str | None = None, + file_options: FileWriteOptions | None = None, + use_threads: bool | None = True, + max_partitions: int = 1024, + max_open_files: int = 1024, + max_rows_per_file: int = 0, + min_rows_per_group: int = 0, + max_rows_per_group: int = 1024 * 1024, # noqa: Y011 + file_visitor: Callable[[str], None] | None = None, + existing_data_behavior: + Literal["error", "overwrite_or_ignore", "delete_matching"] = "error", + create_dir: bool = True, + preserve_order: bool | None = None, +): ... + + +def _get_partition_keys(partition_expression: Expression) -> dict[str, Any]: ... diff --git a/python/pyarrow-stubs/pyarrow/parquet/__init__.pyi b/python/pyarrow-stubs/pyarrow/parquet/__init__.pyi new file mode 100644 index 000000000000..5329bd6c66af --- /dev/null +++ b/python/pyarrow-stubs/pyarrow/parquet/__init__.pyi @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from .core import * # noqa: F401, F403 diff --git a/python/pyarrow-stubs/pyarrow/parquet/core.pyi b/python/pyarrow-stubs/pyarrow/parquet/core.pyi new file mode 100644 index 000000000000..83326c717aeb --- /dev/null +++ b/python/pyarrow-stubs/pyarrow/parquet/core.pyi @@ -0,0 +1,372 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import sys + +from pathlib import Path + +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing_extensions import Self +from collections.abc import Callable, Iterator, Iterable, Sequence +from typing import IO, Literal + +if sys.version_info >= (3, 10): + from typing import TypeAlias +else: + from typing_extensions import TypeAlias + +from pyarrow import _parquet +from pyarrow._compute import Expression +from pyarrow._fs import FileSystem, SupportedFileSystem +from pyarrow._parquet import ( + ColumnChunkMetaData, + ColumnSchema, + FileDecryptionProperties, + FileEncryptionProperties, + FileMetaData, + ParquetLogicalType, + ParquetReader, + ParquetSchema, + RowGroupMetaData, + SortingColumn, + Statistics, +) +from pyarrow._stubs_typing import FilterTuple, SingleOrList +from pyarrow.dataset import ParquetFileFragment, Partitioning, PartitioningFactory +from pyarrow.lib import Buffer, NativeFile, RecordBatch, Schema, Table, ChunkedArray +from typing_extensions import deprecated + +__all__ = ( + "ColumnChunkMetaData", + "ColumnSchema", + "FileDecryptionProperties", + "FileEncryptionProperties", + "FileMetaData", + "ParquetDataset", + "ParquetFile", + "ParquetLogicalType", + "ParquetReader", + "ParquetSchema", + "ParquetWriter", + "RowGroupMetaData", + "SortingColumn", + "Statistics", + "read_metadata", + "read_pandas", + "read_schema", + "read_table", + "write_metadata", + "write_table", + "write_to_dataset", + "_filters_to_expression", + "filters_to_expression", +) + + +def filters_to_expression( + filters: list[FilterTuple | list[FilterTuple]]) -> Expression: ... + + +@deprecated("use filters_to_expression") +def _filters_to_expression( + filters: list[FilterTuple | list[FilterTuple]]) -> Expression: ... + + +_Compression: TypeAlias = Literal["gzip", "bz2", + "brotli", "lz4", "zstd", "snappy", "none"] + + +class ParquetFile: + reader: ParquetReader + common_metadata: FileMetaData + + def __init__( + self, + source: str | Path | Buffer | NativeFile | IO, + *, + metadata: FileMetaData | None = None, + common_metadata: FileMetaData | None = None, + read_dictionary: list[str] | None = None, + memory_map: bool = False, + buffer_size: int = 0, + pre_buffer: bool = False, + coerce_int96_timestamp_unit: str | None = None, + decryption_properties: FileDecryptionProperties | None = None, + thrift_string_size_limit: int | None = None, + thrift_container_size_limit: int | None = None, + filesystem: SupportedFileSystem | None = None, + page_checksum_verification: bool = False, + ): ... + def __enter__(self) -> Self: ... + def __exit__(self, *args, **kwargs) -> None: ... + @property + def metadata(self) -> FileMetaData: ... + @property + def schema(self) -> ParquetSchema: ... + @property + def schema_arrow(self) -> Schema: ... + @property + def num_row_groups(self) -> int: ... + def close(self, force: bool = False) -> None: ... + @property + def closed(self) -> bool: ... + + def read_row_group( + self, + i: int, + columns: Sequence[str | int] | None = None, + use_threads: bool = True, + use_pandas_metadata: bool = False, + ) -> Table: ... + + def read_row_groups( + self, + row_groups: Sequence[int], + columns: Iterable[str | int] | None = None, + use_threads: bool = True, + use_pandas_metadata: bool = False, + ) -> Table: ... + + def iter_batches( + self, + batch_size: int = 65536, + row_groups: Sequence[int] | None = None, + columns: Iterable[str | int] | None = None, + use_threads: bool = True, + use_pandas_metadata: bool = False, + ) -> Iterator[RecordBatch]: ... + + def read( + self, + columns: Sequence[str | int] | None = None, + use_threads: bool = True, + use_pandas_metadata: bool = False, + ) -> Table: ... + + def scan_contents( + self, columns: Iterable[str | int] | None = None, batch_size: int = 65536 + ) -> int: ... + + +class ParquetWriter: + flavor: str + schema_changed: bool + schema: ParquetSchema + where: str | Path | IO + file_handler: NativeFile | None + writer: _parquet.ParquetWriter + is_open: bool + + def __init__( + self, + where: str | Path | IO | NativeFile, + schema: Schema, + filesystem: SupportedFileSystem | None = None, + flavor: str | None = None, + version: Literal["1.0", "2.4", "2.6"] = ..., + use_dictionary: bool = True, + compression: _Compression | dict[str, _Compression] = "snappy", + write_statistics: bool | list = True, + use_deprecated_int96_timestamps: bool | None = None, + compression_level: int | dict | None = None, + use_byte_stream_split: bool | list = False, + column_encoding: str | dict | None = None, + writer_engine_version=None, + data_page_version: Literal["1.0", "2.0"] = ..., + use_compliant_nested_type: bool = True, + encryption_properties: FileEncryptionProperties | None = None, + write_batch_size: int | None = None, + dictionary_pagesize_limit: int | None = None, + store_schema: bool = True, + write_page_index: bool = False, + write_page_checksum: bool = False, + sorting_columns: Sequence[SortingColumn] | None = None, + store_decimal_as_integer: bool = False, + max_rows_per_page: int | None = None, + **options, + ) -> None: ... + def __enter__(self) -> Self: ... + def __exit__(self, *args, **kwargs) -> Literal[False]: ... + + def write( + self, table_or_batch: RecordBatch | Table, row_group_size: int | None = None + ) -> None: ... + def write_batch(self, batch: RecordBatch, + row_group_size: int | None = None) -> None: ... + + def write_table(self, table: Table, row_group_size: int | None = None) -> None: ... + def close(self) -> None: ... + def add_key_value_metadata(self, key_value_metadata: dict[str, str]) -> None: ... + + +class ParquetDataset: + def __init__( + self, + path_or_paths: SingleOrList[str] + | SingleOrList[Path] + | SingleOrList[NativeFile] + | SingleOrList[IO], + filesystem: SupportedFileSystem | None = None, + schema: Schema | None = None, + *, + filters: Expression + | FilterTuple + | list[FilterTuple] + | list[list[FilterTuple]] + | None = None, + read_dictionary: list[str] | None = None, + memory_map: bool = False, + buffer_size: int = 0, + partitioning: str + | list[str] + | Partitioning + | PartitioningFactory + | None = "hive", + ignore_prefixes: list[str] | None = None, + pre_buffer: bool = True, + coerce_int96_timestamp_unit: str | None = None, + decryption_properties: FileDecryptionProperties | None = None, + thrift_string_size_limit: int | None = None, + thrift_container_size_limit: int | None = None, + page_checksum_verification: bool = False, + ): ... + def equals(self, other: ParquetDataset) -> bool: ... + @property + def schema(self) -> Schema: ... + + def read( + self, + columns: list[str] | None = None, + use_threads: bool = True, + use_pandas_metadata: bool = False, + ) -> Table: ... + def read_pandas(self, **kwargs) -> Table: ... + @property + def fragments(self) -> list[ParquetFileFragment]: ... + @property + def files(self) -> list[str]: ... + @property + def filesystem(self) -> FileSystem: ... + @property + def partitioning(self) -> Partitioning: ... + + +def read_table( + source: SingleOrList[str] + | SingleOrList[Path] | SingleOrList[NativeFile] | SingleOrList[IO] | Buffer, + *, + columns: list | None = None, + use_threads: bool = True, + schema: Schema | None = None, + use_pandas_metadata: bool = False, + read_dictionary: list[str] | None = None, + memory_map: bool = False, + buffer_size: int = 0, + partitioning: str | list[str] | Partitioning | PartitioningFactory | None = "hive", + filesystem: SupportedFileSystem | str | None = None, + filters: Expression + | FilterTuple + | list[FilterTuple] + | Sequence[Sequence[tuple]] + | None = None, + ignore_prefixes: list[str] | None = None, + pre_buffer: bool = True, + coerce_int96_timestamp_unit: str | None = None, + decryption_properties: FileDecryptionProperties | None = None, + thrift_string_size_limit: int | None = None, + thrift_container_size_limit: int | None = None, + page_checksum_verification: bool = False, +) -> Table: ... + + +def read_pandas( + source: str | Path | NativeFile | IO | Buffer, columns: list | None = None, **kwargs +) -> Table: ... + + +def write_table( + table: Table, + where: str | Path | NativeFile | IO, + row_group_size: int | None = None, + version: Literal["1.0", "2.4", "2.6"] = "2.6", + use_dictionary: bool = True, + compression: _Compression | dict[str, _Compression] = "snappy", + write_statistics: bool | list = True, + use_deprecated_int96_timestamps: bool | None = None, + coerce_timestamps: str | None = None, + allow_truncated_timestamps: bool = False, + data_page_size: int | None = None, + flavor: str | None = None, + filesystem: SupportedFileSystem | str | None = None, + compression_level: int | dict | None = None, + use_byte_stream_split: bool = False, + column_encoding: str | dict | None = None, + data_page_version: Literal["1.0", "2.0"] = ..., + use_compliant_nested_type: bool = True, + encryption_properties: FileEncryptionProperties | None = None, + write_batch_size: int | None = None, + dictionary_pagesize_limit: int | None = None, + store_schema: bool = True, + write_page_index: bool = False, + write_page_checksum: bool = False, + sorting_columns: Sequence[SortingColumn] | None = None, + store_decimal_as_integer: bool = False, + **kwargs, +) -> None: ... + + +def write_to_dataset( + table: Table | ChunkedArray, + root_path: str | Path, + partition_cols: list[str] | None = None, + filesystem: SupportedFileSystem | None = None, + schema: Schema | None = None, + partitioning: Partitioning | list[str] | None = None, + basename_template: str | None = None, + use_threads: bool | None = None, + file_visitor: Callable[[str], None] | None = None, + existing_data_behavior: Literal["overwrite_or_ignore", "error", "delete_matching"] + | None = None, + **kwargs, +) -> None: ... + + +def write_metadata( + schema: Schema, + where: str | NativeFile, + metadata_collector: list[FileMetaData] | None = None, + filesystem: SupportedFileSystem | None = None, + **kwargs, +) -> None: ... + + +def read_metadata( + where: str | Path | IO | NativeFile, + memory_map: bool = False, + decryption_properties: FileDecryptionProperties | None = None, + filesystem: SupportedFileSystem | str | None = None, +) -> FileMetaData: ... + + +def read_schema( + where: str | Path | IO | NativeFile, + memory_map: bool = False, + decryption_properties: FileDecryptionProperties | None = None, + filesystem: SupportedFileSystem | str | None = None, +) -> Schema: ... diff --git a/python/pyarrow-stubs/pyarrow/parquet/encryption.pyi b/python/pyarrow-stubs/pyarrow/parquet/encryption.pyi new file mode 100644 index 000000000000..7add1c6fa535 --- /dev/null +++ b/python/pyarrow-stubs/pyarrow/parquet/encryption.pyi @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pyarrow._parquet_encryption import ( + CryptoFactory, + DecryptionConfiguration, + EncryptionConfiguration, + FileSystemKeyMaterialStore, + KmsClient, + KmsConnectionConfig, +) + +__all__ = [ + "CryptoFactory", + "DecryptionConfiguration", + "EncryptionConfiguration", + "FileSystemKeyMaterialStore", + "KmsClient", + "KmsConnectionConfig", +] diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 039da8c0d567..967c4b475ddf 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -54,6 +54,9 @@ get_partition_keys as _get_partition_keys, # keep for backwards compatibility _filesystemdataset_write, ) + from pyarrow.fs import FileInfo + + except ImportError as exc: raise ImportError( f"The pyarrow installation is not built with support for 'dataset' ({str(exc)})" @@ -70,7 +73,8 @@ ) try: - from pyarrow._dataset_orc import OrcFileFormat + from pyarrow._dataset_orc import ( # type: ignore[import-not-found] + OrcFileFormat) _orc_available = True except ImportError: pass @@ -371,6 +375,7 @@ def _ensure_multiple_sources(paths, filesystem=None): # possible improvement is to group the file_infos by type and raise for # multiple paths per error category if is_local: + # type: ignore[reportGeneralTypeIssues] for info in filesystem.get_file_info(paths): file_type = info.type if file_type == FileType.File: @@ -422,16 +427,18 @@ def _ensure_single_source(path, filesystem=None): filesystem, path = _resolve_filesystem_and_path(path, filesystem) # ensure that the path is normalized before passing to dataset discovery + assert isinstance(path, str) path = filesystem.normalize_path(path) # retrieve the file descriptor file_info = filesystem.get_file_info(path) + assert isinstance(file_info, FileInfo) # depending on the path type either return with a recursive # directory selector or as a list containing a single file - if file_info.type == FileType.Directory: + if file_info.type == FileType.Directory: # type: ignore[reportAttributeAccessIssue] paths_or_selector = FileSelector(path, recursive=True) - elif file_info.type == FileType.File: + elif file_info.type == FileType.File: # type: ignore[reportAttributeAccessIssue] paths_or_selector = [path] else: raise FileNotFoundError(path) @@ -1035,6 +1042,7 @@ def file_visitor(written_file): _filesystemdataset_write( scanner, base_dir, basename_template, filesystem, partitioning, preserve_order, file_options, max_partitions, file_visitor, - existing_data_behavior, max_open_files, max_rows_per_file, - min_rows_per_group, max_rows_per_group, create_dir + existing_data_behavior, # type: ignore[reportArgumentType] + max_open_files, max_rows_per_file, min_rows_per_group, + max_rows_per_group, create_dir ) diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index 354f18124b53..639ae2a95c44 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -45,7 +45,7 @@ FileDecryptionProperties, SortingColumn) from pyarrow.fs import (LocalFileSystem, FileType, _resolve_filesystem_and_path, - _ensure_filesystem) + _ensure_filesystem, FileInfo) from pyarrow.util import guid, _is_path_like, _stringify_path, _deprecate_api @@ -1415,12 +1415,15 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, *, filters=None, path_or_paths, filesystem, memory_map=memory_map ) finfo = filesystem.get_file_info(path_or_paths) + assert isinstance(finfo, FileInfo) if finfo.type == FileType.Directory: self._base_dir = path_or_paths else: single_file = path_or_paths - parquet_format = ds.ParquetFileFormat(**read_options) + parquet_format = ds.ParquetFileFormat( + **read_options # type: ignore[invalid-argument-type] + ) if single_file is not None: fragment = parquet_format.make_fragment(single_file, filesystem) @@ -1575,6 +1578,7 @@ def _get_common_pandas_metadata(self): for name in ["_common_metadata", "_metadata"]: metadata_path = os.path.join(str(self._base_dir), name) finfo = self.filesystem.get_file_info(metadata_path) + assert isinstance(finfo, FileInfo) if finfo.is_file: pq_meta = read_metadata( metadata_path, filesystem=self.filesystem) @@ -1673,6 +1677,7 @@ def files(self): >>> dataset.files ['dataset_v2_files/year=2019/...-0.parquet', ... """ + assert isinstance(self._dataset, pa.dataset.FileSystemDataset) return self._dataset.files @property @@ -1680,6 +1685,7 @@ def filesystem(self): """ The filesystem type of the Dataset source. """ + assert isinstance(self._dataset, pa.dataset.FileSystemDataset) return self._dataset.filesystem @property @@ -1687,6 +1693,7 @@ def partitioning(self): """ The partitioning of the Dataset source, if discovered. """ + assert isinstance(self._dataset, pa.dataset.FileSystemDataset) return self._dataset.partitioning @@ -1903,14 +1910,16 @@ def read_table(source, *, columns=None, use_threads=True, filesystem, path = _resolve_filesystem_and_path(source, filesystem) if filesystem is not None: - if not filesystem.get_file_info(path).is_file: + file_info = filesystem.get_file_info(path) + assert isinstance(file_info, FileInfo) + if not file_info.is_file: raise ValueError( "the 'source' argument should be " "an existing parquet file and not a directory " "when the pyarrow.dataset module is not available" ) - source = filesystem.open_input_file(path) + source = filesystem.open_input_file(path) # type: ignore dataset = ParquetFile( source, read_dictionary=read_dictionary, @@ -2083,7 +2092,8 @@ def write_table(table, where, row_group_size=None, version='2.6', def write_to_dataset(table, root_path, partition_cols=None, filesystem=None, schema=None, partitioning=None, basename_template=None, use_threads=None, - file_visitor=None, existing_data_behavior=None, + file_visitor=None, # type: ignore[reportRedeclaration] + existing_data_behavior=None, **kwargs): """Wrapper around dataset.write_dataset for writing a Table to Parquet format by partitions. @@ -2312,7 +2322,7 @@ def write_metadata(schema, where, metadata_collector=None, filesystem=None, filesystem, where = _resolve_filesystem_and_path(where, filesystem) if hasattr(where, "seek"): # file-like - cursor_position = where.tell() + cursor_position = where.tell() # type: ignore[reportAttributeAccessIssue] writer = ParquetWriter(where, schema, filesystem, **kwargs) writer.close() @@ -2321,8 +2331,8 @@ def write_metadata(schema, where, metadata_collector=None, filesystem=None, # ParquetWriter doesn't expose the metadata until it's written. Write # it and read it again. metadata = read_metadata(where, filesystem=filesystem) - if hasattr(where, "seek"): - where.seek(cursor_position) # file-like, set cursor back. + if hasattr(where, "seek"): # file-like, set cursor back. + where.seek(cursor_position) # type: ignore[reportAttributeAccessIssue] for m in metadata_collector: metadata.append_row_groups(m) diff --git a/python/pyarrow/parquet/encryption.py b/python/pyarrow/parquet/encryption.py index df6eed913fa5..1c6835d6acfe 100644 --- a/python/pyarrow/parquet/encryption.py +++ b/python/pyarrow/parquet/encryption.py @@ -20,4 +20,5 @@ EncryptionConfiguration, DecryptionConfiguration, KmsConnectionConfig, - KmsClient) + KmsClient, + FileSystemKeyMaterialStore) diff --git a/python/pyarrow/tests/parquet/common.py b/python/pyarrow/tests/parquet/common.py index 5390a24b90d2..3cbf5801dfc1 100644 --- a/python/pyarrow/tests/parquet/common.py +++ b/python/pyarrow/tests/parquet/common.py @@ -16,11 +16,12 @@ # under the License. import io +from typing import cast try: import numpy as np except ImportError: - np = None + pass import pyarrow as pa from pyarrow.tests import util @@ -137,7 +138,7 @@ def make_sample_file(table_or_df): else: a_table = pa.Table.from_pandas(table_or_df) - buf = io.BytesIO() + buf = io.BytesIO() # type: ignore[attr-defined] _write_table(a_table, buf, compression='SNAPPY', version='2.6') buf.seek(0) @@ -161,12 +162,9 @@ def alltypes_sample(size=10000, seed=0, categorical=False): 'float32': np.arange(size, dtype=np.float32), 'float64': np.arange(size, dtype=np.float64), 'bool': np.random.randn(size) > 0, - 'datetime_ms': np.arange("2016-01-01T00:00:00.001", size, - dtype='datetime64[ms]'), - 'datetime_us': np.arange("2016-01-01T00:00:00.000001", size, - dtype='datetime64[us]'), - 'datetime_ns': np.arange("2016-01-01T00:00:00.000000001", size, - dtype='datetime64[ns]'), + 'datetime_ms': pd.date_range("2016-01-01T00:00:00.001", periods=size, freq='ms').values, + 'datetime_us': pd.date_range("2016-01-01T00:00:00.000001", periods=size, freq='us').values, + 'datetime_ns': pd.date_range("2016-01-01T00:00:00.000000001", periods=size, freq='ns').values, 'timedelta': np.arange(0, size, dtype="timedelta64[s]"), 'str': pd.Series([str(x) for x in range(size)]), 'empty_str': [''] * size, @@ -175,5 +173,6 @@ def alltypes_sample(size=10000, seed=0, categorical=False): 'null_list': [None] * 2 + [[None] * (x % 4) for x in range(size - 2)], } if categorical: - arrays['str_category'] = arrays['str'].astype('category') + import pandas as pd + arrays['str_category'] = cast(pd.Series, arrays['str']).astype('category') return pd.DataFrame(arrays) diff --git a/python/pyarrow/tests/parquet/encryption.py b/python/pyarrow/tests/parquet/encryption.py index efaee1d08a93..7a6ef3de7bc1 100644 --- a/python/pyarrow/tests/parquet/encryption.py +++ b/python/pyarrow/tests/parquet/encryption.py @@ -30,7 +30,7 @@ def __init__(self, config): pe.KmsClient.__init__(self) self.master_keys_map = config.custom_kms_conf - def wrap_key(self, key_bytes, master_key_identifier): + def wrap_key(self, key_bytes, master_key_identifier): # type: ignore[override] """Not a secure cipher - the wrapped key is just the master key concatenated with key bytes""" master_key_bytes = self.master_keys_map[master_key_identifier].encode( @@ -39,7 +39,7 @@ def wrap_key(self, key_bytes, master_key_identifier): result = base64.b64encode(wrapped_key) return result - def unwrap_key(self, wrapped_key, master_key_identifier): + def unwrap_key(self, wrapped_key, master_key_identifier): # type: ignore[override] """Not a secure cipher - just extract the key from the wrapped key""" if master_key_identifier not in self.master_keys_map: diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py index 345aee3c4ef4..347d10cf76a1 100644 --- a/python/pyarrow/tests/parquet/test_basic.py +++ b/python/pyarrow/tests/parquet/test_basic.py @@ -35,7 +35,7 @@ import pyarrow.parquet as pq from pyarrow.tests.parquet.common import _read_table, _write_table except ImportError: - pq = None + pass try: @@ -45,12 +45,12 @@ from pyarrow.tests.pandas_examples import dataframe_with_lists from pyarrow.tests.parquet.common import alltypes_sample except ImportError: - pd = tm = None + pass try: import numpy as np except ImportError: - np = None + pass # Marks all of the tests in this module # Ignore these with pytest ... -m 'not parquet' @@ -162,10 +162,10 @@ def test_invalid_source(): # Test that we provide an helpful error message pointing out # that None wasn't expected when trying to open a Parquet None file. with pytest.raises(TypeError, match="None"): - pq.read_table(None) + pq.read_table(None) # type: ignore[arg-type] with pytest.raises(TypeError, match="None"): - pq.ParquetFile(None) + pq.ParquetFile(None) # type: ignore[arg-type] def test_read_table_without_dataset(tempdir): @@ -755,7 +755,7 @@ def test_fastparquet_cross_compatibility(tempdir): # Arrow -> fastparquet file_arrow = str(tempdir / "cross_compat_arrow.parquet") - pq.write_table(table, file_arrow, compression=None) + pq.write_table(table, file_arrow, compression=None) # type: ignore[arg-type] fp_file = fp.ParquetFile(file_arrow) df_fp = fp_file.to_pandas() @@ -796,7 +796,7 @@ def test_buffer_contents( for col in table.columns: [chunk] = col.chunks buf = chunk.buffers()[1] - assert buf.to_pybytes() == buf.size * b"\0" + assert buf.to_pybytes() == buf.size * b"\0" # type: ignore[union-attr] def test_parquet_compression_roundtrip(tempdir): @@ -806,7 +806,7 @@ def test_parquet_compression_roundtrip(tempdir): # the stream due to auto-detecting the extension in the filename table = pa.table([pa.array(range(4))], names=["ints"]) path = tempdir / "arrow-10480.pyarrow.gz" - pq.write_table(table, path, compression="GZIP") + pq.write_table(table, path, compression="GZIP") # type: ignore[arg-type] result = pq.read_table(path) assert result.equals(table) @@ -831,7 +831,7 @@ def test_empty_row_groups(tempdir): def test_reads_over_batch(tempdir): data = [None] * (1 << 20) - data.append([1]) + data.append([1]) # type: ignore[reportArgumentType] # Large list with mostly nones and one final # value. This should force batched reads when # reading back. diff --git a/python/pyarrow/tests/parquet/test_compliant_nested_type.py b/python/pyarrow/tests/parquet/test_compliant_nested_type.py index 2345855a3321..af418812be82 100644 --- a/python/pyarrow/tests/parquet/test_compliant_nested_type.py +++ b/python/pyarrow/tests/parquet/test_compliant_nested_type.py @@ -24,15 +24,14 @@ from pyarrow.tests.parquet.common import (_read_table, _check_roundtrip) except ImportError: - pq = None + pass try: import pandas as pd - import pandas.testing as tm from pyarrow.tests.parquet.common import _roundtrip_pandas_dataframe except ImportError: - pd = tm = None + pass # Marks all of the tests in this module diff --git a/python/pyarrow/tests/parquet/test_data_types.py b/python/pyarrow/tests/parquet/test_data_types.py index c546bc1532ac..bd48ffe71558 100644 --- a/python/pyarrow/tests/parquet/test_data_types.py +++ b/python/pyarrow/tests/parquet/test_data_types.py @@ -22,7 +22,7 @@ try: import numpy as np except ImportError: - np = None + pass import pytest import pyarrow as pa @@ -33,7 +33,7 @@ import pyarrow.parquet as pq from pyarrow.tests.parquet.common import _read_table, _write_table except ImportError: - pq = None + pass try: @@ -44,7 +44,7 @@ dataframe_with_lists) from pyarrow.tests.parquet.common import alltypes_sample except ImportError: - pd = tm = None + pass # Marks all of the tests in this module @@ -142,7 +142,7 @@ def test_direct_read_dictionary(): read_dictionary=['f0']) # Compute dictionary-encoded subfield - expected = pa.table([table[0].dictionary_encode()], names=['f0']) + expected = pa.table([table.column(0).dictionary_encode()], names=['f0']) assert result.equals(expected) @@ -174,7 +174,7 @@ def test_direct_read_dictionary_subfield(): expected = pa.table([expected_arr], names=['f0']) assert result.equals(expected) - assert result[0].num_chunks == 1 + assert result.column(0).num_chunks == 1 @pytest.mark.numpy @@ -260,8 +260,8 @@ def test_single_pylist_column_roundtrip(tempdir, dtype,): _write_table(table, filename) table_read = _read_table(filename) for i in range(table.num_columns): - col_written = table[i] - col_read = table_read[i] + col_written = table.column(i) + col_read = table_read.column(i) assert table.field(i).name == table_read.field(i).name assert col_read.num_chunks == 1 data_written = col_written.chunk(0) diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index d3e9cda73018..14253ca7d6b2 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -20,35 +20,41 @@ import os import pathlib import sys +from typing import TYPE_CHECKING try: import numpy as np except ImportError: - np = None + pass import pytest import unittest.mock as mock import pyarrow as pa import pyarrow.compute as pc -from pyarrow.fs import (FileSelector, FileSystem, LocalFileSystem, +from pyarrow.fs import (FileSelector, FileSystem, LocalFileSystem, FileInfo, FileType, PyFileSystem, SubTreeFileSystem, FSSpecHandler) from pyarrow.tests import util from pyarrow.util import guid -try: +if TYPE_CHECKING: + import pandas as pd + import pandas.testing as tm import pyarrow.parquet as pq from pyarrow.tests.parquet.common import ( _read_table, _test_dataframe, _test_table, _write_table) -except ImportError: - pq = None +else: + try: + import pyarrow.parquet as pq + from pyarrow.tests.parquet.common import ( + _read_table, _test_dataframe, _test_table, _write_table) + except ImportError: + pass - -try: - import pandas as pd - import pandas.testing as tm - -except ImportError: - pd = tm = None + try: + import pandas as pd + import pandas.testing as tm + except ImportError: + pass # Marks all of the tests in this module @@ -70,8 +76,8 @@ def test_filesystem_uri(tempdir): assert result.equals(table) # filesystem URI - result = pq.read_table( - "data_dir/data.parquet", filesystem=util._filesystem_uri(tempdir)) + result = pq.read_table("data_dir/data.parquet", + filesystem=util._filesystem_uri(tempdir)) assert result.equals(table) @@ -553,7 +559,7 @@ def _generate_partition_directories(fs, base_dir, partition_spec, df): # ['bar', ['a', 'b', 'c']] # part_table : a pyarrow.Table to write to each partition if not isinstance(fs, FileSystem): - fs = PyFileSystem(FSSpecHandler(fs)) + fs = PyFileSystem(FSSpecHandler(fs)) # type: ignore[abstract] DEPTH = len(partition_spec) @@ -572,15 +578,15 @@ def _visit_level(base_dir, level, part_keys): if level == DEPTH - 1: # Generate example data - from pyarrow.fs import FileType - file_path = pathsep.join([level_dir, guid()]) filtered_df = _filter_partition(df, this_part_keys) part_table = pa.Table.from_pandas(filtered_df) with fs.open_output_stream(file_path) as f: _write_table(part_table, f) - assert fs.get_file_info(file_path).type != FileType.NotFound - assert fs.get_file_info(file_path).type == FileType.File + file_info = fs.get_file_info(file_path) + assert isinstance(file_info, FileInfo) + assert file_info.type != FileType.NotFound + assert file_info.type == FileType.File file_success = pathsep.join([level_dir, '_SUCCESS']) with fs.open_output_stream(file_success) as f: @@ -717,8 +723,8 @@ def test_dataset_read_pandas(tempdir): paths = [] for i in range(nfiles): df = _test_dataframe(size, seed=i) - df.index = np.arange(i * size, (i + 1) * size) - df.index.name = 'index' + df.index = np.arange(i * size, (i + 1) * size) # type: ignore[assignment] + df.index.name = 'index' # type: ignore[attr-defined] path = dirpath / f'{i}.parquet' @@ -931,8 +937,7 @@ def _test_write_to_dataset_with_partitions(base_path, 'group2': list('eefeffgeee'), 'num': list(range(10)), 'nan': [np.nan] * 10, - 'date': np.arange('2017-01-01', '2017-01-11', dtype='datetime64[D]').astype( - 'datetime64[ns]') + 'date': pd.date_range('2017-01-01', periods=10, freq='D').values.astype('datetime64[ns]') }) cols = output_df.columns.tolist() partition_by = ['group1', 'group2'] @@ -965,7 +970,7 @@ def _test_write_to_dataset_with_partitions(base_path, input_df_cols = input_df.columns.tolist() assert partition_by == input_df_cols[-1 * len(partition_by):] - input_df = input_df[cols] + input_df = input_df.loc[:, cols] # Partitioned columns become 'categorical' dtypes for col in partition_by: output_df[col] = output_df[col].astype('category') @@ -974,6 +979,7 @@ def _test_write_to_dataset_with_partitions(base_path, expected_date_type = schema.field('date').type.to_pandas_dtype() output_df["date"] = output_df["date"].astype(expected_date_type) + assert isinstance(input_df, pd.DataFrame) tm.assert_frame_equal(output_df, input_df) @@ -988,8 +994,7 @@ def _test_write_to_dataset_no_partitions(base_path, 'group1': list('aaabbbbccc'), 'group2': list('eefeffgeee'), 'num': list(range(10)), - 'date': np.arange('2017-01-01', '2017-01-11', dtype='datetime64[D]').astype( - 'datetime64[ns]') + 'date': pd.date_range('2017-01-01', periods=10, freq='D').values.astype('datetime64[ns]') }) cols = output_df.columns.tolist() output_table = pa.Table.from_pandas(output_df) @@ -997,7 +1002,7 @@ def _test_write_to_dataset_no_partitions(base_path, if filesystem is None: filesystem = LocalFileSystem() elif not isinstance(filesystem, FileSystem): - filesystem = PyFileSystem(FSSpecHandler(filesystem)) + filesystem = PyFileSystem(FSSpecHandler(filesystem)) # type: ignore[abstract] # Without partitions, append files to root_path n = 5 @@ -1009,8 +1014,10 @@ def _test_write_to_dataset_no_partitions(base_path, recursive=True) infos = filesystem.get_file_info(selector) - output_files = [info for info in infos if info.path.endswith(".parquet")] - assert len(output_files) == n + if isinstance(infos, list): + assert all(isinstance(info, FileInfo) for info in infos) + output_files = [info for info in infos if info.path.endswith(".parquet")] + assert len(output_files) == n # Deduplicated incoming DataFrame should match # original outgoing Dataframe @@ -1020,6 +1027,7 @@ def _test_write_to_dataset_no_partitions(base_path, input_df = input_table.to_pandas() input_df = input_df.drop_duplicates() input_df = input_df[cols] + assert isinstance(input_df, pd.DataFrame) tm.assert_frame_equal(output_df, input_df) @@ -1168,11 +1176,11 @@ def test_dataset_read_dictionary(tempdir): path, read_dictionary=['f0']).read() # The order of the chunks is non-deterministic - ex_chunks = [t1[0].chunk(0).dictionary_encode(), - t2[0].chunk(0).dictionary_encode()] + ex_chunks = [t1.column(0).chunk(0).dictionary_encode(), + t2.column(0).chunk(0).dictionary_encode()] - assert result[0].num_chunks == 2 - c0, c1 = result[0].chunk(0), result[0].chunk(1) + assert result.column(0).num_chunks == 2 + c0, c1 = result.column(0).chunk(0), result.column(0).chunk(1) if c0.equals(ex_chunks[0]): assert c1.equals(ex_chunks[1]) else: diff --git a/python/pyarrow/tests/parquet/test_datetime.py b/python/pyarrow/tests/parquet/test_datetime.py index b89fd97cb91e..a7652a01e64f 100644 --- a/python/pyarrow/tests/parquet/test_datetime.py +++ b/python/pyarrow/tests/parquet/test_datetime.py @@ -22,7 +22,7 @@ try: import numpy as np except ImportError: - np = None + pass import pytest import pyarrow as pa @@ -32,7 +32,7 @@ import pyarrow.parquet as pq from pyarrow.tests.parquet.common import _read_table, _write_table except ImportError: - pq = None + pass try: @@ -41,7 +41,7 @@ from pyarrow.tests.parquet.common import _roundtrip_pandas_dataframe except ImportError: - pd = tm = None + pass # Marks all of the tests in this module @@ -56,7 +56,7 @@ def test_pandas_parquet_datetime_tz(): # coerce to [ns] due to lack of non-[ns] support. s = pd.Series([datetime.datetime(2017, 9, 6)], dtype='datetime64[us]') s = s.dt.tz_localize('utc') - s.index = s + s.index = s # type: ignore[assignment] # Both a column and an index to hit both use cases df = pd.DataFrame({'tz_aware': s, @@ -287,7 +287,8 @@ def test_coerce_int96_timestamp_unit(unit): # For either Parquet version, coercing to nanoseconds is allowed # if Int96 storage is used - expected = pa.Table.from_arrays([arrays.get(unit)]*4, names) + array_for_unit = arrays.get(unit, a_ns) + expected = pa.Table.from_arrays([array_for_unit] * 4, names) read_table_kwargs = {"coerce_int96_timestamp_unit": unit} _check_roundtrip(table, expected, read_table_kwargs=read_table_kwargs, @@ -323,6 +324,7 @@ def get_table(pq_reader_method, filename, **kwargs): # with the default resolution of ns, we get wrong values for INT96 # that are out of bounds for nanosecond range tab_error = get_table(pq_reader_method, filename) + assert tab_error is not None with warnings.catch_warnings(): warnings.filterwarnings("ignore", "Discarding nonzero nanoseconds in conversion", @@ -333,6 +335,7 @@ def get_table(pq_reader_method, filename, **kwargs): tab_correct = get_table( pq_reader_method, filename, coerce_int96_timestamp_unit="s" ) + assert tab_correct is not None df_correct = tab_correct.to_pandas(timestamp_as_object=True) df["a"] = df["a"].astype(object) tm.assert_frame_equal(df, df_correct) diff --git a/python/pyarrow/tests/parquet/test_encryption.py b/python/pyarrow/tests/parquet/test_encryption.py index 4e2fb069bd06..82b934edf774 100644 --- a/python/pyarrow/tests/parquet/test_encryption.py +++ b/python/pyarrow/tests/parquet/test_encryption.py @@ -21,8 +21,7 @@ import pyarrow.parquet as pq import pyarrow.parquet.encryption as pe except ImportError: - pq = None - pe = None + pass else: from pyarrow.tests.parquet.encryption import (InMemoryKmsClient, MockVersioningKmsClient, @@ -131,7 +130,7 @@ def test_encrypted_parquet_write_read(tempdir, data_table): encryption_algorithm="AES_GCM_V1", cache_lifetime=timedelta(minutes=5.0), data_key_length_bits=256) - assert encryption_config.uniform_encryption is False + assert encryption_config.uniform_encryption is False # type: ignore[attr-defined] kms_connection_config, crypto_factory = write_encrypted_file( path, data_table, FOOTER_KEY_NAME, COL_KEY_NAME, FOOTER_KEY, COL_KEY, @@ -154,11 +153,11 @@ def test_uniform_encrypted_parquet_write_read(tempdir, data_table): # Encrypt the footer and all columns with the footer key, encryption_config = pe.EncryptionConfiguration( footer_key=FOOTER_KEY_NAME, - uniform_encryption=True, + uniform_encryption=True, # type: ignore[call-arg] encryption_algorithm="AES_GCM_V1", cache_lifetime=timedelta(minutes=5.0), data_key_length_bits=256) - assert encryption_config.uniform_encryption is True + assert encryption_config.uniform_encryption is True # type: ignore[attr-defined] kms_connection_config, crypto_factory = write_encrypted_file( path, data_table, FOOTER_KEY_NAME, COL_KEY_NAME, FOOTER_KEY, b"", @@ -303,7 +302,7 @@ def test_encrypted_parquet_write_col_key_and_uniform_encryption(tempdir, data_ta column_keys={ COL_KEY_NAME: ["a", "b"], }, - uniform_encryption=True) + uniform_encryption=True) # type: ignore[call-arg] with pytest.raises(OSError, match=r"Cannot set both column_keys and uniform_encryption"): @@ -415,7 +414,7 @@ def unwrap_key(self, wrapped_key, master_key_identifier): def kms_factory(kms_connection_configuration): return WrongTypeKmsClient(kms_connection_configuration) - crypto_factory = pe.CryptoFactory(kms_factory) + crypto_factory = pe.CryptoFactory(kms_factory) # type: ignore[arg-type] with pytest.raises(TypeError): # Write with encryption properties write_encrypted_parquet(path, data_table, encryption_config, @@ -554,7 +553,7 @@ def test_encrypted_parquet_write_read_external(tempdir, data_table, result_table = read_encrypted_parquet( path, decryption_config, kms_connection_config, crypto_factory, internal_key_material=False) - store = pa._parquet_encryption.FileSystemKeyMaterialStore.for_file(path) + store = pe.FileSystemKeyMaterialStore.for_file(path) assert len(key_ids := store.get_key_id_set()) == ( len(external_encryption_config.column_keys[COL_KEY_NAME]) + 1) diff --git a/python/pyarrow/tests/parquet/test_metadata.py b/python/pyarrow/tests/parquet/test_metadata.py index 148bfebaa67f..646873b3d4f1 100644 --- a/python/pyarrow/tests/parquet/test_metadata.py +++ b/python/pyarrow/tests/parquet/test_metadata.py @@ -19,11 +19,7 @@ import decimal from collections import OrderedDict import io - -try: - import numpy as np -except ImportError: - np = None +from typing import TYPE_CHECKING import pytest import pyarrow as pa @@ -31,20 +27,25 @@ from pyarrow.fs import LocalFileSystem from pyarrow.tests import util -try: - import pyarrow.parquet as pq - from pyarrow.tests.parquet.common import _write_table -except ImportError: - pq = None - - -try: +if TYPE_CHECKING: + import numpy as np import pandas as pd - import pandas.testing as tm - - from pyarrow.tests.parquet.common import alltypes_sample -except ImportError: - pd = tm = None + import pyarrow.parquet as pq + from pyarrow.tests.parquet.common import alltypes_sample, _write_table +else: + try: + import pyarrow.parquet as pq + from pyarrow.tests.parquet.common import _write_table, alltypes_sample + except ImportError: + pass + try: + import pandas as pd + except ImportError: + pass + try: + import numpy as np + except ImportError: + pass # Marks all of the tests in this module @@ -56,7 +57,7 @@ def test_parquet_metadata_api(): df = alltypes_sample(size=10000) df = df.reindex(columns=sorted(df.columns)) - df.index = np.random.randint(0, 1000000, size=len(df)) + df.index = np.random.randint(0, 1000000, size=len(df)) # type: ignore[assignment] fileh = make_sample_file(df) ncols = len(df.columns) @@ -80,15 +81,15 @@ def test_parquet_metadata_api(): col = schema[0] repr(col) - assert col.name == df.columns[0] - assert col.max_definition_level == 1 - assert col.max_repetition_level == 0 - assert col.max_repetition_level == 0 - assert col.physical_type == 'BOOLEAN' - assert col.converted_type == 'NONE' + assert col.name == df.columns[0] # type: ignore[attr-defined] + assert col.max_definition_level == 1 # type: ignore[attr-defined] + assert col.max_repetition_level == 0 # type: ignore[attr-defined] + assert col.max_repetition_level == 0 # type: ignore[attr-defined] + assert col.physical_type == 'BOOLEAN' # type: ignore[attr-defined] + assert col.converted_type == 'NONE' # type: ignore[attr-defined] col_float16 = schema[5] - assert col_float16.logical_type.type == 'FLOAT16' + assert col_float16.logical_type.type == 'FLOAT16' # type: ignore[attr-defined] with pytest.raises(IndexError): schema[ncols + 1] # +1 for index @@ -210,15 +211,16 @@ def test_parquet_column_statistics_api(data, type, physical_type, min_value, col_meta = rg_meta.column(0) stat = col_meta.statistics - assert stat.has_min_max - assert _close(type, stat.min, min_value) - assert _close(type, stat.max, max_value) - assert stat.null_count == null_count - assert stat.num_values == num_values + assert stat is not None + assert stat.has_min_max # type: ignore[attr-defined] + assert _close(type, stat.min, min_value) # type: ignore[attr-defined] + assert _close(type, stat.max, max_value) # type: ignore[attr-defined] + assert stat.null_count == null_count # type: ignore[attr-defined] + assert stat.num_values == num_values # type: ignore[attr-defined] # TODO(kszucs) until parquet-cpp API doesn't expose HasDistinctCount # method, missing distinct_count is represented as zero instead of None - assert stat.distinct_count == distinct_count - assert stat.physical_type == physical_type + assert stat.distinct_count == distinct_count # type: ignore[attr-defined] + assert stat.physical_type == physical_type # type: ignore[attr-defined] def _close(type, left, right): @@ -236,8 +238,10 @@ def test_parquet_raise_on_unset_statistics(): df = pd.DataFrame({"t": pd.Series([pd.NaT], dtype="datetime64[ns]")}) meta = make_sample_file(pa.Table.from_pandas(df)).metadata - assert not meta.row_group(0).column(0).statistics.has_min_max - assert meta.row_group(0).column(0).statistics.max is None + stat = meta.row_group(0).column(0).statistics + assert stat is not None + assert not stat.has_min_max + assert stat.max is None def test_statistics_convert_logical_types(tempdir): @@ -271,8 +275,9 @@ def test_statistics_convert_logical_types(tempdir): pq.write_table(t, path, version='2.6') pf = pq.ParquetFile(path) stats = pf.metadata.row_group(0).column(0).statistics - assert stats.min == min_val - assert stats.max == max_val + assert stats is not None + assert stats.min == min_val # type: ignore[attr-defined] + assert stats.max == max_val # type: ignore[attr-defined] def test_parquet_write_disable_statistics(tempdir): @@ -429,29 +434,36 @@ def test_field_id_metadata(): pf = pq.ParquetFile(pa.BufferReader(contents)) schema = pf.schema_arrow - assert schema[0].metadata[field_id] == b'1' - assert schema[0].metadata[b'other'] == b'abc' + assert schema[0].metadata is not None + assert schema[0].metadata[field_id] == b'1' # type: ignore[index] + assert schema[0].metadata[b'other'] == b'abc' # type: ignore[index] list_field = schema[1] - assert list_field.metadata[field_id] == b'11' + assert list_field.metadata is not None + assert list_field.metadata[field_id] == b'11' # type: ignore[index] list_item_field = list_field.type.value_field - assert list_item_field.metadata[field_id] == b'10' + assert list_item_field.metadata is not None + assert list_item_field.metadata[field_id] == b'10' # type: ignore[index] struct_field = schema[2] - assert struct_field.metadata[field_id] == b'102' + assert struct_field.metadata is not None + assert struct_field.metadata[field_id] == b'102' # type: ignore[index] struct_middle_field = struct_field.type[0] - assert struct_middle_field.metadata[field_id] == b'101' + assert struct_middle_field.metadata is not None + assert struct_middle_field.metadata[field_id] == b'101' # type: ignore[index] struct_inner_field = struct_middle_field.type[0] - assert struct_inner_field.metadata[field_id] == b'100' + assert struct_inner_field.metadata is not None + assert struct_inner_field.metadata[field_id] == b'100' # type: ignore[index] assert schema[3].metadata is None # Invalid input is passed through (ok) but does not # have field_id in parquet (not tested) - assert schema[4].metadata[field_id] == b'xyz' - assert schema[5].metadata[field_id] == b'-1000' + assert schema[4].metadata is not None + assert schema[4].metadata[field_id] == b'xyz' # type: ignore[index] + assert schema[5].metadata[field_id] == b'-1000' # type: ignore[index] def test_parquet_file_page_index(): @@ -495,13 +507,14 @@ def test_multi_dataset_metadata(tempdir): _meta.append_row_groups(meta[0]) # Write merged metadata-only file + assert _meta is not None with open(metapath, "wb") as f: - _meta.write_metadata_file(f) + _meta.write_metadata_file(f) # type: ignore[union-attr] # Read back the metadata meta = pq.read_metadata(metapath) md = meta.to_dict() - _md = _meta.to_dict() + _md = _meta.to_dict() # type: ignore[union-attr] for key in _md: if key != 'serialized_size': assert _md[key] == md[key] @@ -695,13 +708,14 @@ def test_metadata_schema_filesystem(tempdir): assert pq.read_metadata( file_path, filesystem=LocalFileSystem()).equals(metadata) assert pq.read_metadata( + # type: ignore[arg-type] fname, filesystem=f'file:///{tempdir}').equals(metadata) assert pq.read_schema(file_uri).equals(schema) assert pq.read_schema( file_path, filesystem=LocalFileSystem()).equals(schema) assert pq.read_schema( - fname, filesystem=f'file:///{tempdir}').equals(schema) + fname, filesystem=f'file:///{tempdir}').equals(schema) # type: ignore[arg-type] with util.change_cwd(tempdir): # Pass `filesystem` arg @@ -721,7 +735,7 @@ def test_metadata_equals(): original_metadata = pq.read_metadata(pa.BufferReader(buf)) match = "Argument 'other' has incorrect type" with pytest.raises(TypeError, match=match): - original_metadata.equals(None) + original_metadata.equals(None) # type: ignore[arg-type] @pytest.mark.parametrize("t1,t2,expected_error", ( @@ -810,7 +824,7 @@ def msg(c): pq.ColumnChunkMetaData() with pytest.raises(TypeError, match=msg("RowGroupMetaData")): - pq.RowGroupMetaData() + pq.RowGroupMetaData() # type: ignore[call-arg] with pytest.raises(TypeError, match=msg("FileMetaData")): - pq.FileMetaData() + pq.FileMetaData() # type: ignore[call-arg] diff --git a/python/pyarrow/tests/parquet/test_pandas.py b/python/pyarrow/tests/parquet/test_pandas.py index 53864ff15ea2..91ae23857344 100644 --- a/python/pyarrow/tests/parquet/test_pandas.py +++ b/python/pyarrow/tests/parquet/test_pandas.py @@ -17,11 +17,12 @@ import io import json +from typing import TYPE_CHECKING, cast try: import numpy as np except ImportError: - np = None + pass import pytest import pyarrow as pa @@ -29,22 +30,29 @@ from pyarrow.util import guid from pyarrow.vendored.version import Version -try: - import pyarrow.parquet as pq - from pyarrow.tests.parquet.common import (_read_table, _test_dataframe, - _write_table) -except ImportError: - pq = None - - -try: +if TYPE_CHECKING: import pandas as pd import pandas.testing as tm + import pyarrow.parquet as pq + from pyarrow.tests.parquet.common import ( + _read_table, _roundtrip_pandas_dataframe, _test_dataframe, + _write_table, alltypes_sample + ) +else: + try: + import pyarrow.parquet as pq + from pyarrow.tests.parquet.common import ( + _read_table, _test_dataframe, _write_table, alltypes_sample, + _roundtrip_pandas_dataframe + ) - from pyarrow.tests.parquet.common import (_roundtrip_pandas_dataframe, - alltypes_sample) -except ImportError: - pd = tm = None + except ImportError: + pass + try: + import pandas as pd + import pandas.testing as tm + except ImportError: + pass # Marks all of the tests in this module @@ -58,11 +66,14 @@ def test_pandas_parquet_custom_metadata(tempdir): filename = tempdir / 'pandas_roundtrip.parquet' arrow_table = pa.Table.from_pandas(df) + assert arrow_table.schema.metadata is not None assert b'pandas' in arrow_table.schema.metadata _write_table(arrow_table, filename) - metadata = pq.read_metadata(filename).metadata + file_metadata = pq.read_metadata(filename) + metadata = file_metadata.metadata + assert metadata is not None assert b'pandas' in metadata js = json.loads(metadata[b'pandas'].decode('utf8')) @@ -117,10 +128,13 @@ def test_attributes_metadata_persistence(tempdir): } table = pa.Table.from_pandas(df) + assert table.schema.metadata is not None assert b'attributes' in table.schema.metadata[b'pandas'] _write_table(table, filename) - metadata = pq.read_metadata(filename).metadata + file_metadata = pq.read_metadata(filename) + metadata = file_metadata.metadata + assert metadata is not None js = json.loads(metadata[b'pandas'].decode('utf8')) assert 'attributes' in js assert js['attributes'] == df.attrs @@ -297,8 +311,8 @@ def test_pandas_parquet_configuration_options(tempdir): @pytest.mark.pandas def test_spark_flavor_preserves_pandas_metadata(): df = _test_dataframe(size=100) - df.index = np.arange(0, 10 * len(df), 10) - df.index.name = 'foo' + df.index = np.arange(0, 10 * len(df), 10) # type: ignore[assignment] + df.index.name = 'foo' # type: ignore[attr-defined] result = _roundtrip_pandas_dataframe(df, {'flavor': 'spark'}) tm.assert_frame_equal(result, df) @@ -450,7 +464,9 @@ def test_backwards_compatible_column_metadata_handling(datadir): table = _read_table( path, columns=['a']) result = table.to_pandas() - tm.assert_frame_equal(result, expected[['a']].reset_index(drop=True)) + expected_df = expected[['a']].reset_index(drop=True) + assert isinstance(expected_df, pd.DataFrame) + tm.assert_frame_equal(result, expected_df) @pytest.mark.pandas @@ -510,7 +526,7 @@ def test_pandas_categorical_roundtrip(): codes = np.array([2, 0, 0, 2, 0, -1, 2], dtype='int32') categories = ['foo', 'bar', 'baz'] df = pd.DataFrame({'x': pd.Categorical.from_codes( - codes, categories=categories)}) + codes, categories=categories)}) # type: ignore[arg-type] buf = pa.BufferOutputStream() pq.write_table(pa.table(df), buf) @@ -555,15 +571,18 @@ def test_write_to_dataset_pandas_preserve_extensiondtypes(tempdir): table, str(tempdir / "case1"), partition_cols=['part'], ) result = pq.read_table(str(tempdir / "case1")).to_pandas() - tm.assert_frame_equal(result[["col"]], df[["col"]]) + tm.assert_frame_equal( + result[["col"]], df[["col"]]) pq.write_to_dataset(table, str(tempdir / "case2")) result = pq.read_table(str(tempdir / "case2")).to_pandas() - tm.assert_frame_equal(result[["col"]], df[["col"]]) + tm.assert_frame_equal( + result[["col"]], df[["col"]]) pq.write_table(table, str(tempdir / "data.parquet")) result = pq.read_table(str(tempdir / "data.parquet")).to_pandas() - tm.assert_frame_equal(result[["col"]], df[["col"]]) + tm.assert_frame_equal( + result[["col"]], df[["col"]]) @pytest.mark.pandas diff --git a/python/pyarrow/tests/parquet/test_parquet_file.py b/python/pyarrow/tests/parquet/test_parquet_file.py index a62b5c3298c9..3c5182dc56e9 100644 --- a/python/pyarrow/tests/parquet/test_parquet_file.py +++ b/python/pyarrow/tests/parquet/test_parquet_file.py @@ -30,15 +30,14 @@ import pyarrow.parquet as pq from pyarrow.tests.parquet.common import _write_table except ImportError: - pq = None + pass try: - import pandas as pd import pandas.testing as tm from pyarrow.tests.parquet.common import alltypes_sample except ImportError: - pd = tm = None + pass # Marks all of the tests in this module @@ -172,7 +171,7 @@ def test_scan_contents(): pf = pq.ParquetFile(buf) assert pf.scan_contents() == 10000 - assert pf.scan_contents(df.columns[:4]) == 10000 + assert pf.scan_contents(list(df.columns[:4])) == 10000 def test_parquet_file_pass_directory_instead_of_file(tempdir): @@ -215,7 +214,7 @@ def test_iter_batches_columns_reader(tempdir, batch_size): chunk_size=chunk_size) file_ = pq.ParquetFile(filename) - for columns in [df.columns[:10], df.columns[10:]]: + for columns in [list(df.columns[:10]), list(df.columns[10:])]: batches = file_.iter_batches(batch_size=batch_size, columns=columns) batch_starts = range(0, total_size+batch_size, batch_size) for batch, start in zip(batches, batch_starts): @@ -263,9 +262,10 @@ def get_all_batches(f): tm.assert_frame_equal( batches[batch_no].to_pandas().reset_index(drop=True), - file_.read_row_groups([i]).to_pandas().iloc[900:].reset_index( - drop=True - ) + file_ + .read_row_groups([i]) + .to_pandas().iloc[900:] + .reset_index(drop=True) # type: ignore[arg-type] ) batch_no += 1 @@ -346,6 +346,7 @@ def test_read_statistics(): buf.seek(0) statistics = pq.ParquetFile(buf).read().columns[0].chunks[0].statistics + assert statistics is not None assert statistics.is_null_count_exact is True assert statistics.null_count == 1 assert statistics.distinct_count is None @@ -389,7 +390,8 @@ def test_parquet_file_fsspec_support(): def test_parquet_file_fsspec_support_through_filesystem_argument(): try: - from fsspec.implementations.memory import MemoryFileSystem + from fsspec.implementations.memory import ( # type: ignore[import-untyped] + MemoryFileSystem) except ImportError: pytest.skip("fsspec is not installed, skipping test") @@ -412,7 +414,7 @@ def test_parquet_file_hugginface_support(): pytest.skip("fsspec is not installed, skipping Hugging Face test") fake_hf_module = types.ModuleType("huggingface_hub") - fake_hf_module.HfFileSystem = MemoryFileSystem + fake_hf_module.HfFileSystem = MemoryFileSystem # type: ignore[attr-defined] with mock.patch.dict("sys.modules", {"huggingface_hub": fake_hf_module}): uri = "hf://datasets/apache/arrow/test.parquet" table = pa.table({"a": range(10)}) @@ -424,7 +426,7 @@ def test_parquet_file_hugginface_support(): def test_fsspec_uri_raises_if_fsspec_is_not_available(): # sadly cannot patch sys.modules because cython will still be able to import fsspec try: - import fsspec # noqa: F401 + import fsspec # type: ignore[import-untyped] # noqa: F401 except ImportError: pass else: diff --git a/python/pyarrow/tests/parquet/test_parquet_writer.py b/python/pyarrow/tests/parquet/test_parquet_writer.py index a49441f09f45..87787a0f3f00 100644 --- a/python/pyarrow/tests/parquet/test_parquet_writer.py +++ b/python/pyarrow/tests/parquet/test_parquet_writer.py @@ -23,9 +23,10 @@ try: import pyarrow.parquet as pq from pyarrow.tests.parquet.common import (_read_table, _test_dataframe, + # type: ignore[attr-defined] _test_table, _range_integers) except ImportError: - pq = None + pass try: @@ -33,7 +34,7 @@ import pandas.testing as tm except ImportError: - pd = tm = None + pass # Marks all of the tests in this module @@ -94,10 +95,10 @@ def test_parquet_invalid_writer(tempdir): # avoid segfaults with invalid construction with pytest.raises(TypeError): some_schema = pa.schema([pa.field("x", pa.int32())]) - pq.ParquetWriter(None, some_schema) + pq.ParquetWriter(None, some_schema) # type: ignore[arg-type] with pytest.raises(TypeError): - pq.ParquetWriter(tempdir / "some_path", None) + pq.ParquetWriter(tempdir / "some_path", None) # type: ignore[arg-type] @pytest.mark.pandas @@ -335,6 +336,7 @@ def test_parquet_writer_store_schema(tempdir): writer.write_table(table) meta = pq.read_metadata(path1) + assert meta.metadata is not None assert b'ARROW:schema' in meta.metadata assert meta.metadata[b'ARROW:schema'] @@ -357,6 +359,7 @@ def test_parquet_writer_append_key_value_metadata(tempdir): writer.add_key_value_metadata({'key2': '2', 'key3': '3'}) reader = pq.ParquetFile(path) metadata = reader.metadata.metadata + assert metadata is not None assert metadata[b'key1'] == b'1' assert metadata[b'key2'] == b'2' assert metadata[b'key3'] == b'3' diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index d00c0c4b3eb9..ce913612bad5 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -32,7 +32,7 @@ try: import numpy as np except ImportError: - np = None + pass import pytest import pyarrow as pa @@ -40,6 +40,7 @@ import pyarrow.csv import pyarrow.feather import pyarrow.fs as fs +from pyarrow.fs import FileInfo import pyarrow.json from pyarrow.lib import is_threading_enabled from pyarrow.tests.util import (FSProtocolClass, ProxyHandler, @@ -49,17 +50,17 @@ try: import pandas as pd except ImportError: - pd = None + pass try: import pyarrow.dataset as ds except ImportError: - ds = None + pass try: import pyarrow.parquet as pq except ImportError: - pq = None + pass # Marks all of the tests in this module # Ignore these with pytest ... -m 'not dataset' @@ -395,14 +396,16 @@ def test_filesystem_dataset(mockfs): # validation of required arguments with pytest.raises(TypeError, match="incorrect type"): - ds.FileSystemDataset(fragments, file_format, schema) + ds.FileSystemDataset(fragments, file_format, schema) # type: ignore[arg-type] # validation of root_partition with pytest.raises(TypeError, match="incorrect type"): - ds.FileSystemDataset(fragments, schema=schema, - format=file_format, root_partition=1) + ds.FileSystemDataset( + fragments, schema=schema, format=file_format, + root_partition=1) # type: ignore[arg-type] # missing required argument in from_paths with pytest.raises(TypeError, match="incorrect type"): - ds.FileSystemDataset.from_paths(fragments, format=file_format) + ds.FileSystemDataset.from_paths( + fragments, format=file_format) # type: ignore[arg-type] def test_filesystem_dataset_no_filesystem_interaction(dataset_reader): @@ -827,7 +830,8 @@ def test_partitioning(): load_back = None with pytest.raises(ValueError, match="Expected Partitioning or PartitioningFactory"): - load_back = ds.dataset(tempdir, format='ipc', partitioning=int(0)) + load_back = ds.dataset( + tempdir, format='ipc', partitioning=int(0)) # type: ignore[arg-type] assert load_back is None @@ -859,8 +863,8 @@ def test_partitioning_pickling(pickle_module): ) def test_dataset_partitioning_format( flavor: str, - expected_defined_partition: tuple, - expected_undefined_partition: tuple, + expected_defined_partition: tuple[str], + expected_undefined_partition: tuple[str], ): partitioning_schema = pa.schema([("foo", pa.string()), ("bar", pa.string())]) @@ -1215,6 +1219,7 @@ def test_make_fragment(multisourcefs): parquet_format = ds.ParquetFileFormat() dataset = ds.dataset('/plain', filesystem=multisourcefs, format=parquet_format) + assert isinstance(dataset, ds.FileSystemDataset) for path in dataset.files: fragment = parquet_format.make_fragment(path, multisourcefs) @@ -1252,7 +1257,9 @@ def test_make_fragment_with_size(s3_example_simple): assert tbl.equals(table) # true sizes -> works - sizes_true = [dataset.filesystem.get_file_info(x).size for x in dataset.files] + dataset_file_info = [dataset.filesystem.get_file_info(x) for x in dataset.files] + sizes_true = [x.size if isinstance( + x, FileInfo) else None for x in dataset_file_info] fragments_with_size = [file_format.make_fragment(path, fs, file_size=size) for path, size in zip(paths, sizes_true)] dataset_with_size = ds.FileSystemDataset( @@ -1943,6 +1950,7 @@ def test_fragments_repr(tempdir, dataset): # single-file parquet dataset (no partition information in repr) table, path = _create_single_file(tempdir) dataset = ds.dataset(path, format="parquet") + assert isinstance(dataset, ds.FileSystemDataset) fragment = list(dataset.get_fragments())[0] assert ( repr(fragment) == @@ -1954,6 +1962,7 @@ def test_fragments_repr(tempdir, dataset): path = tempdir / "data.feather" pa.feather.write_feather(table, path) dataset = ds.dataset(path, format="feather") + assert isinstance(dataset, ds.FileSystemDataset) fragment = list(dataset.get_fragments())[0] assert ( repr(fragment) == @@ -2065,7 +2074,7 @@ def test_partitioning_factory_segment_encoding(pickled, pickle_module): actual = factory.finish().to_table(columns={ "date_int": ds.field("date").cast(pa.int64()), }) - assert actual[0][0].as_py() == 1620086400 + assert actual.column(0).chunk(0)[0].as_py() == 1620086400 partitioning_factory = ds.DirectoryPartitioning.discover( ["date", "string"], segment_encoding="none") @@ -2105,7 +2114,7 @@ def test_partitioning_factory_segment_encoding(pickled, pickle_module): actual = factory.finish().to_table(columns={ "date_int": ds.field("date").cast(pa.int64()), }) - assert actual[0][0].as_py() == 1620086400 + assert actual.column(0).chunk(0)[0].as_py() == 1620086400 partitioning_factory = ds.HivePartitioning.discover( segment_encoding="none") @@ -2173,7 +2182,7 @@ def test_partitioning_factory_hive_segment_encoding_key_encoded(pickled, pickle_ actual = factory.finish().to_table(columns={ "date_int": ds.field("test'; date").cast(pa.int64()), }) - assert actual[0][0].as_py() == 1620086400 + assert actual.column(0).chunk(0)[0].as_py() == 1620086400 partitioning_factory = ds.HivePartitioning.discover( segment_encoding="uri") @@ -2231,7 +2240,7 @@ def test_dictionary_partitioning_outer_nulls_raises(tempdir): def test_positional_keywords_raises(tempdir): table = pa.table({'a': ['x', 'y', None], 'b': ['x', 'y', 'z']}) with pytest.raises(TypeError): - ds.write_dataset(table, tempdir, "basename-{i}.arrow") + ds.write_dataset(table, tempdir, "basename-{i}.arrow") # type: ignore[arg-type] @pytest.mark.parquet @@ -2245,20 +2254,20 @@ def test_read_partition_keys_only(tempdir): 'key': pa.repeat(0, BATCH_SIZE + 1), 'value': np.arange(BATCH_SIZE + 1)}) pq.write_to_dataset( - table[:BATCH_SIZE], + table[:BATCH_SIZE], # type: ignore[arg-type] tempdir / 'one', partition_cols=['key']) pq.write_to_dataset( - table[:BATCH_SIZE + 1], + table[:BATCH_SIZE + 1], # type: ignore[arg-type] tempdir / 'two', partition_cols=['key']) table = pq.read_table(tempdir / 'one', columns=['key']) - assert table['key'].num_chunks == 1 + assert table.column('key').num_chunks == 1 table = pq.read_table(tempdir / 'two', columns=['key', 'value']) - assert table['key'].num_chunks == 2 + assert table.column('key').num_chunks == 2 table = pq.read_table(tempdir / 'two', columns=['key']) - assert table['key'].num_chunks == 2 + assert table.column('key').num_chunks == 2 def _has_subdirs(basedir): @@ -2319,9 +2328,9 @@ def test_partitioning_function(): with pytest.raises(ValueError): ds.partitioning() with pytest.raises(ValueError, match="Expected list"): - ds.partitioning(field_names=schema) + ds.partitioning(field_names=schema) # type: ignore[arg-type] with pytest.raises(ValueError, match="Cannot specify both"): - ds.partitioning(schema, field_names=schema) + ds.partitioning(schema, field_names=schema) # type: ignore[call-overload] # Hive partitioning part = ds.partitioning(schema, flavor="hive") @@ -2332,13 +2341,13 @@ def test_partitioning_function(): assert isinstance(part, ds.PartitioningFactory) # cannot pass list of names with pytest.raises(ValueError): - ds.partitioning(names, flavor="hive") + ds.partitioning(names, flavor="hive") # type: ignore[arg-type] with pytest.raises(ValueError, match="Cannot specify 'field_names'"): ds.partitioning(field_names=names, flavor="hive") # unsupported flavor with pytest.raises(ValueError): - ds.partitioning(schema, flavor="unsupported") + ds.partitioning(schema, flavor="unsupported") # type: ignore[arg-type] @pytest.mark.parquet @@ -2353,6 +2362,8 @@ def test_directory_partitioning_dictionary_key(mockfs): dataset = ds.dataset( "subdir", format="parquet", filesystem=mockfs, partitioning=part ) + assert isinstance(dataset, ds.FileSystemDataset) + assert dataset.partitioning is not None assert dataset.partitioning.schema == schema table = dataset.to_table() @@ -2373,6 +2384,8 @@ def test_hive_partitioning_dictionary_key(multisourcefs): dataset = ds.dataset( "hive", format="parquet", filesystem=multisourcefs, partitioning=part ) + assert isinstance(dataset, ds.FileSystemDataset) + assert dataset.partitioning is not None assert dataset.partitioning.schema == schema table = dataset.to_table() @@ -2380,11 +2393,13 @@ def test_hive_partitioning_dictionary_key(multisourcefs): month_dictionary = list(range(1, 13)) assert table.column('year').type.equals(schema.types[0]) for chunk in table.column('year').chunks: + assert isinstance(chunk, pa.DictionaryArray) actual = chunk.dictionary.to_pylist() actual.sort() assert actual == year_dictionary assert table.column('month').type.equals(schema.types[1]) for chunk in table.column('month').chunks: + assert isinstance(chunk, pa.DictionaryArray) actual = chunk.dictionary.to_pylist() actual.sort() assert actual == month_dictionary @@ -2574,6 +2589,8 @@ def test_construct_from_mixed_child_datasets(mockfs): 'subdir/2/yyy/file1.parquet'], filesystem=mockfs) b = ds.dataset('subdir', filesystem=mockfs) + assert isinstance(a, ds.FileSystemDataset) + assert isinstance(b, ds.FileSystemDataset) dataset = ds.dataset([a, b]) assert isinstance(dataset, ds.UnionDataset) @@ -2585,8 +2602,8 @@ def test_construct_from_mixed_child_datasets(mockfs): assert len(dataset.children) == 2 for child in dataset.children: - assert child.files == ['subdir/1/xxx/file0.parquet', - 'subdir/2/yyy/file1.parquet'] + assert child.files == [ # type: ignore[attr-defined] + 'subdir/1/xxx/file0.parquet', 'subdir/2/yyy/file1.parquet'] def test_construct_empty_dataset(): @@ -2620,7 +2637,7 @@ def test_construct_from_invalid_sources_raise(multisourcefs): batch2 = pa.RecordBatch.from_arrays([pa.array(range(10))], names=["b"]) with pytest.raises(TypeError, match='Expected.*FileSystemDatasetFactory'): - ds.dataset([child1, child2]) + ds.dataset([child1, child2]) # type: ignore[arg-type] expected = ( "Expected a list of path-like or dataset objects, or a list " @@ -2628,14 +2645,14 @@ def test_construct_from_invalid_sources_raise(multisourcefs): "types: int" ) with pytest.raises(TypeError, match=expected): - ds.dataset([1, 2, 3]) + ds.dataset([1, 2, 3]) # type: ignore[arg-type] expected = ( "Expected a path-like, list of path-likes or a list of Datasets " "instead of the given type: NoneType" ) with pytest.raises(TypeError, match=expected): - ds.dataset(None) + ds.dataset(None) # type: ignore[arg-type] expected = ( "Expected a path-like, list of path-likes or a list of Datasets " @@ -2662,7 +2679,7 @@ def test_construct_from_invalid_sources_raise(multisourcefs): "batches or tables. The given list contains the following types:" ) with pytest.raises(TypeError, match=expected): - ds.dataset([batch1, 0]) + ds.dataset([batch1, 0]) # type: ignore[arg-type] expected = ( "Expected a list of tables or batches. The given list contains a int" @@ -2752,7 +2769,7 @@ def test_open_dataset_partitioned_directory(tempdir, dataset_reader, pickle_modu dataset = ds.dataset( str(path), partitioning=ds.partitioning( - pa.schema([("part", pa.int8())]), flavor="hive")) + schema=pa.schema([("part", pa.int8())]), flavor="hive")) expected_schema = table.schema.append(pa.field("part", pa.int8())) assert dataset.schema.equals(expected_schema) @@ -2797,7 +2814,7 @@ def test_open_union_dataset(tempdir, dataset_reader, pickle_module): _, path = _create_single_file(tempdir) dataset = ds.dataset(path) - union = ds.dataset([dataset, dataset]) + union = ds.dataset([dataset, dataset]) # type: ignore[arg-type] assert isinstance(union, ds.UnionDataset) pickled = pickle_module.loads(pickle_module.dumps(union)) @@ -2807,7 +2824,7 @@ def test_open_union_dataset(tempdir, dataset_reader, pickle_module): def test_open_union_dataset_with_additional_kwargs(multisourcefs): child = ds.dataset('/plain', filesystem=multisourcefs, format='parquet') with pytest.raises(ValueError, match="cannot pass any additional"): - ds.dataset([child], format="parquet") + ds.dataset([child], format="parquet") # type: ignore[arg-type] def test_open_dataset_non_existing_file(): @@ -2894,7 +2911,7 @@ def expected_type(key): def test_dataset_partitioned_dictionary_type_reconstruct(tempdir, pickle_module): # https://issues.apache.org/jira/browse/ARROW-11400 table = pa.table({'part': np.repeat(['A', 'B'], 5), 'col': range(10)}) - part = ds.partitioning(table.select(['part']).schema, flavor="hive") + part = ds.partitioning(schema=table.select(['part']).schema, flavor="hive") ds.write_dataset(table, tempdir, partitioning=part, format="feather") dataset = ds.dataset( @@ -2902,7 +2919,7 @@ def test_dataset_partitioned_dictionary_type_reconstruct(tempdir, pickle_module) partitioning=ds.HivePartitioning.discover(infer_dictionary=True) ) expected = pa.table( - {'col': table['col'], 'part': table['part'].dictionary_encode()} + {'col': table.column('col'), 'part': table.column('part').dictionary_encode()} ) assert dataset.to_table().equals(expected) fragment = list(dataset.get_fragments())[0] @@ -2987,7 +3004,7 @@ def test_open_dataset_from_uri_s3_fsspec(s3_example_simple): assert dataset.to_table().equals(table) # directly passing the fsspec-handler - fs = PyFileSystem(FSSpecHandler(fs)) + fs = PyFileSystem(FSSpecHandler(fs)) # type: ignore[abstract] dataset = ds.dataset(path, format="parquet", filesystem=fs) assert dataset.to_table().equals(table) @@ -3089,7 +3106,7 @@ def test_file_format_inspect_fsspec(tempdir): format = ds.ParquetFileFormat() # manually creating a PyFileSystem instead of using fs._ensure_filesystem # which would convert an fsspec local filesystem to a native one - filesystem = fs.PyFileSystem(fs.FSSpecHandler(fsspec_fs)) + filesystem = fs.PyFileSystem(fs.FSSpecHandler(fsspec_fs)) # type: ignore[abstract] schema = format.inspect(path, filesystem) assert schema.equals(table.schema) @@ -3107,11 +3124,11 @@ def test_filter_timestamp(tempdir, dataset_reader): "id": range(10)}) # write dataset partitioned on dates (as strings) - part = ds.partitioning(table.select(['dates']).schema, flavor="hive") + part = ds.partitioning(schema=table.select(['dates']).schema, flavor="hive") ds.write_dataset(table, path, partitioning=part, format="feather") # read dataset partitioned on dates (as timestamps) - part = ds.partitioning(pa.schema([("dates", pa.timestamp("s"))]), + part = ds.partitioning(schema=pa.schema([("dates", pa.timestamp("s"))]), flavor="hive") dataset = ds.dataset(path, format="feather", partitioning=part) @@ -3162,7 +3179,7 @@ def test_filter_compute_expression(tempdir, dataset_reader): filter_ = pc.is_in(ds.field('A'), pa.array(["a", "b"])) assert dataset_reader.to_table(dataset, filter=filter_).num_rows == 3 - filter_ = pc.hour(ds.field('B')) >= 3 + filter_ = pc.hour(ds.field('B')) >= 3 # type: ignore[operator] assert dataset_reader.to_table(dataset, filter=filter_).num_rows == 2 days = pc.days_between(ds.field('B'), ds.field("C")) @@ -3194,12 +3211,12 @@ def test_union_dataset_from_other_datasets(tempdir, multisourcefs): assert child1.schema != child2.schema != child3.schema - assembled = ds.dataset([child1, child2, child3]) + assembled = ds.dataset([child1, child2, child3]) # type: ignore[arg-type] assert isinstance(assembled, ds.UnionDataset) msg = 'cannot pass any additional arguments' with pytest.raises(ValueError, match=msg): - ds.dataset([child1, child2], filesystem=multisourcefs) + ds.dataset([child1, child2], filesystem=multisourcefs) # type: ignore[arg-type] expected_schema = pa.schema([ ('date', pa.date32()), @@ -3213,7 +3230,7 @@ def test_union_dataset_from_other_datasets(tempdir, multisourcefs): assert assembled.schema.equals(expected_schema) assert assembled.to_table().schema.equals(expected_schema) - assembled = ds.dataset([child1, child3]) + assembled = ds.dataset([child1, child3]) # type: ignore[arg-type] expected_schema = pa.schema([ ('date', pa.date32()), ('index', pa.int64()), @@ -3230,6 +3247,7 @@ def test_union_dataset_from_other_datasets(tempdir, multisourcefs): ('color', pa.string()), ('date', pa.date32()), ]) + # type: ignore[arg-type] assembled = ds.dataset([child1, child3], schema=expected_schema) assert assembled.to_table().schema.equals(expected_schema) @@ -3238,6 +3256,7 @@ def test_union_dataset_from_other_datasets(tempdir, multisourcefs): ('color', pa.string()), ('unknown', pa.string()) # fill with nulls ]) + # type: ignore[arg-type] assembled = ds.dataset([child1, child3], schema=expected_schema) assert assembled.to_table().schema.equals(expected_schema) @@ -3248,7 +3267,7 @@ def test_union_dataset_from_other_datasets(tempdir, multisourcefs): child4 = ds.dataset(path) with pytest.raises(pa.ArrowTypeError, match='Unable to merge'): - ds.dataset([child1, child4]) + ds.dataset([child1, child4]) # type: ignore[arg-type] def test_dataset_from_a_list_of_local_directories_raises(multisourcefs): @@ -3259,7 +3278,7 @@ def test_dataset_from_a_list_of_local_directories_raises(multisourcefs): def test_union_dataset_filesystem_datasets(multisourcefs): # without partitioning - dataset = ds.dataset([ + dataset = ds.dataset([ # type: ignore[arg-type] ds.dataset('/plain', filesystem=multisourcefs), ds.dataset('/schema', filesystem=multisourcefs), ds.dataset('/hive', filesystem=multisourcefs), @@ -3273,7 +3292,7 @@ def test_union_dataset_filesystem_datasets(multisourcefs): assert dataset.schema.equals(expected_schema) # with hive partitioning for two hive sources - dataset = ds.dataset([ + dataset = ds.dataset([ # type: ignore[arg-type] ds.dataset('/plain', filesystem=multisourcefs), ds.dataset('/schema', filesystem=multisourcefs), ds.dataset('/hive', filesystem=multisourcefs, partitioning='hive') @@ -3333,7 +3352,7 @@ def _check_dataset(schema, expected, expected_schema=None): # Specifying with differing field types schema = pa.schema([('a', 'int32'), ('b', 'float64')]) dataset = ds.dataset(str(tempdir / "data.parquet"), schema=schema) - expected = pa.table([table['a'].cast('int32'), + expected = pa.table([table['a'].cast('int32'), # type: ignore[arg-type] table['b']], names=['a', 'b']) _check_dataset(schema, expected) @@ -3834,7 +3853,7 @@ def test_parquet_dataset_factory_fsspec(tempdir): fsspec_fs = fsspec.filesystem("file") # manually creating a PyFileSystem, because passing the local fsspec # filesystem would internally be converted to native LocalFileSystem - filesystem = fs.PyFileSystem(fs.FSSpecHandler(fsspec_fs)) + filesystem = fs.PyFileSystem(fs.FSSpecHandler(fsspec_fs)) # type: ignore[abstract] dataset = ds.parquet_dataset(metadata_path, filesystem=filesystem) assert dataset.schema.equals(table.schema) assert len(dataset.files) == 4 @@ -4042,12 +4061,14 @@ def test_filter_mismatching_schema(tempdir, dataset_reader): # filtering on a column with such type mismatch should implicitly # cast the column filtered = dataset_reader.to_table(dataset, filter=ds.field("col") > 2) - assert filtered["col"].equals(table["col"].cast('int64').slice(2)) + assert filtered["col"].equals(table["col"].cast( + 'int64').slice(2)) # type: ignore[arg-type] fragment = list(dataset.get_fragments())[0] filtered = dataset_reader.to_table( fragment, filter=ds.field("col") > 2, schema=schema) - assert filtered["col"].equals(table["col"].cast('int64').slice(2)) + assert filtered["col"].equals(table["col"].cast( + 'int64').slice(2)) # type: ignore[arg-type] @pytest.mark.parquet @@ -4112,6 +4133,7 @@ def test_dataset_preserved_partitioning(tempdir): # through discovery, but without partitioning _, path = _create_single_file(tempdir) dataset = ds.dataset(path) + assert isinstance(dataset, ds.FileSystemDataset) assert isinstance(dataset.partitioning, ds.DirectoryPartitioning) # TODO(GH-34884) partitioning attribute not preserved in pickling # dataset_ = ds.dataset(path) @@ -4121,10 +4143,12 @@ def test_dataset_preserved_partitioning(tempdir): # through discovery, with hive partitioning but not specified full_table, path = _create_partitioned_dataset(tempdir) dataset = ds.dataset(path) + assert isinstance(dataset, ds.FileSystemDataset) assert isinstance(dataset.partitioning, ds.DirectoryPartitioning) # through discovery, with hive partitioning (from a partitioning factory) dataset = ds.dataset(path, partitioning="hive") + assert isinstance(dataset, ds.FileSystemDataset) part = dataset.partitioning assert part is not None assert isinstance(part, ds.HivePartitioning) @@ -4133,11 +4157,12 @@ def test_dataset_preserved_partitioning(tempdir): assert part.dictionaries[0] == pa.array([0, 1, 2], pa.int32()) # through discovery, with hive partitioning (from a partitioning object) - part = ds.partitioning(pa.schema([("part", pa.int32())]), flavor="hive") + part = ds.partitioning(schema=pa.schema([("part", pa.int32())]), flavor="hive") assert isinstance(part, ds.HivePartitioning) # not a factory assert len(part.dictionaries) == 1 assert all(x is None for x in part.dictionaries) dataset = ds.dataset(path, partitioning=part) + assert isinstance(dataset, ds.FileSystemDataset) part = dataset.partitioning assert isinstance(part, ds.HivePartitioning) assert part.schema == pa.schema([("part", pa.int32())]) @@ -4147,6 +4172,7 @@ def test_dataset_preserved_partitioning(tempdir): # through manual creation -> not available dataset = ds.dataset(path, partitioning="hive") + assert isinstance(dataset, ds.FileSystemDataset) dataset2 = ds.FileSystemDataset( list(dataset.get_fragments()), schema=dataset.schema, format=dataset.format, filesystem=dataset.filesystem @@ -4192,7 +4218,7 @@ def _sort_table(tab, sort_col): import pyarrow.compute as pc sorted_indices = pc.sort_indices( tab, options=pc.SortOptions([(sort_col, 'ascending')])) - return pc.take(tab, sorted_indices) + return pc.take(tab, sorted_indices) # type: ignore[arg-type] def _check_dataset_roundtrip(dataset, base_dir, expected_files, sort_col, @@ -4265,7 +4291,7 @@ def test_write_dataset_partitioned(tempdir): target / "part=b", target / "part=b" / "part-0.arrow" ] partitioning_schema = ds.partitioning( - pa.schema([("part", pa.string())]), flavor="hive") + schema=pa.schema([("part", pa.string())]), flavor="hive") _check_dataset_roundtrip( dataset, str(target), expected_paths, 'f1', target, partitioning=partitioning_schema) @@ -4277,7 +4303,7 @@ def test_write_dataset_partitioned(tempdir): target / "b", target / "b" / "part-0.arrow" ] partitioning_schema = ds.partitioning( - pa.schema([("part", pa.string())])) + schema=pa.schema([("part", pa.string())])) _check_dataset_roundtrip( dataset, str(target), expected_paths, 'f1', target, partitioning=partitioning_schema) @@ -4290,6 +4316,7 @@ def test_write_dataset_with_field_names(tempdir): partitioning=["b"]) load_back = ds.dataset(tempdir, format='ipc', partitioning=["b"]) + assert isinstance(load_back, ds.FileSystemDataset) files = load_back.files partitioning_dirs = { str(pathlib.Path(f).relative_to(tempdir).parent) for f in files @@ -4307,6 +4334,7 @@ def test_write_dataset_with_field_names_hive(tempdir): partitioning=["b"], partitioning_flavor="hive") load_back = ds.dataset(tempdir, format='ipc', partitioning="hive") + assert isinstance(load_back, ds.FileSystemDataset) files = load_back.files partitioning_dirs = { str(pathlib.Path(f).relative_to(tempdir).parent) for f in files @@ -4624,7 +4652,7 @@ def test_write_dataset_max_open_files(tempdir): record_batch_3, record_batch_4]) partitioning = ds.partitioning( - pa.schema([(column_names[partition_column_id], pa.string())]), + schema=pa.schema([(column_names[partition_column_id], pa.string())]), flavor="hive") data_source_1 = directory / "default" @@ -4638,7 +4666,8 @@ def test_write_dataset_max_open_files(tempdir): def _get_compare_pair(data_source, record_batch, file_format, col_id): num_of_files_generated = _get_num_of_files_generated( base_directory=data_source, file_format=file_format) - number_of_partitions = len(pa.compute.unique(record_batch[col_id])) + unique_vals = pa.compute.unique(record_batch[col_id]) + number_of_partitions = len(unique_vals) # type: ignore[arg-type] return num_of_files_generated, number_of_partitions # CASE 1: when max_open_files=default & max_open_files >= num_of_partitions @@ -4685,7 +4714,7 @@ def test_write_dataset_partitioned_dict(tempdir): target / "a", target / "a" / "part-0.arrow", target / "b", target / "b" / "part-0.arrow" ] - partitioning = ds.partitioning(pa.schema([ + partitioning = ds.partitioning(schema=pa.schema([ dataset.schema.field('part')]), dictionaries={'part': pa.array(['a', 'b'])}) # NB: dictionaries required here since we use partitioning to parse @@ -4704,7 +4733,7 @@ def test_write_dataset_use_threads(tempdir): dataset = ds.dataset(directory, partitioning="hive") partitioning = ds.partitioning( - pa.schema([("part", pa.string())]), flavor="hive") + schema=pa.schema([("part", pa.string())]), flavor="hive") target1 = tempdir / 'partitioned1' paths_written = [] @@ -4744,7 +4773,7 @@ def test_write_dataset_use_threads_preserve_order(tempdir): batches = table.to_batches(max_chunksize=2) ds.write_dataset(batches, tempdir, format="parquet", use_threads=True, preserve_order=True) - seq = ds.dataset(tempdir).to_table(use_threads=False)['a'].to_numpy() + seq = ds.dataset(tempdir).to_table(use_threads=False).column('a').to_numpy() prev = -1 for item in seq: curr = int(item) @@ -4784,7 +4813,7 @@ def file_visitor(written_file): visited_sizes.append(written_file.size) partitioning = ds.partitioning( - pa.schema([("part", pa.string())]), flavor="hive") + schema=pa.schema([("part", pa.string())]), flavor="hive") ds.write_dataset(table, base_dir, format="feather", basename_template='dat_{i}.arrow', partitioning=partitioning, file_visitor=file_visitor) @@ -4896,7 +4925,7 @@ def test_write_table_partitioned_dict(tempdir): pa.array(['a'] * 10 + ['b'] * 10).dictionary_encode(), ], names=['col', 'part']) - partitioning = ds.partitioning(table.select(["part"]).schema) + partitioning = ds.partitioning(schema=table.select(["part"]).schema) base_dir = tempdir / "dataset" ds.write_dataset( @@ -4917,8 +4946,7 @@ def test_write_table_partitioned_dict(tempdir): def test_write_dataset_parquet(tempdir): table = pa.table([ pa.array(range(20), type="uint32"), - pa.array(np.arange("2012-01-01", 20, dtype="datetime64[D]").astype( - "datetime64[ns]")), + pa.array(pd.date_range("2012-01-01", periods=20, freq='D').values.astype("datetime64[ns]")), pa.array(np.repeat(['a', 'b'], 10)) ], names=["f1", "f2", "part"]) @@ -5014,7 +5042,7 @@ def test_partition_dataset_parquet_file_visitor(tempdir): root_path = tempdir / 'partitioned' partitioning = ds.partitioning( - pa.schema([("part", pa.string())]), flavor="hive") + schema=pa.schema([("part", pa.string())]), flavor="hive") paths_written = [] @@ -5047,11 +5075,11 @@ def test_write_dataset_arrow_schema_metadata(tempdir): # ensure we serialize ARROW schema in the parquet metadata, to have a # correct roundtrip (e.g. preserve non-UTC timezone) table = pa.table({"a": [pd.Timestamp("2012-01-01", tz="Europe/Brussels")]}) - assert table["a"].type.tz == "Europe/Brussels" + assert table.column("a").type.tz == "Europe/Brussels" ds.write_dataset(table, tempdir, format="parquet") result = pq.read_table(tempdir / "part-0.parquet") - assert result["a"].type.tz == "Europe/Brussels" + assert result.column("a").type.tz == "Europe/Brussels" def test_write_dataset_schema_metadata(tempdir): @@ -5092,7 +5120,7 @@ def test_write_dataset_s3(s3_example_simple): pa.array(['a'] * 10 + ['b'] * 10)], names=["f1", "f2", "part"] ) - part = ds.partitioning(pa.schema([("part", pa.string())]), flavor="hive") + part = ds.partitioning(schema=pa.schema([("part", pa.string())]), flavor="hive") # writing with filesystem object ds.write_dataset( @@ -5171,7 +5199,7 @@ def test_write_dataset_s3_put_only(s3_server): pa.array(['a']*10 + ['b'] * 10)], names=["f1", "f2", "part"] ) - part = ds.partitioning(pa.schema([("part", pa.string())]), flavor="hive") + part = ds.partitioning(schema=pa.schema([("part", pa.string())]), flavor="hive") # writing with filesystem object with create_dir flag set to false ds.write_dataset( @@ -5549,7 +5577,7 @@ def test_union_dataset_filter(tempdir, dstype): else: raise NotImplementedError - filtered_union_ds = ds.dataset((ds1, ds2)).filter( + filtered_union_ds = ds.dataset((ds1, ds2)).filter( # type: ignore[arg-type] (pc.field("colA") < 3) | (pc.field("colA") == 9) ) assert filtered_union_ds.to_table() == pa.table({ @@ -5571,7 +5599,7 @@ def test_union_dataset_filter(tempdir, dstype): filtered_ds2 = ds2.filter(pc.field("colA") < 10) with pytest.raises(ValueError, match="currently not supported"): - ds.dataset((filtered_ds1, filtered_ds2)) + ds.dataset((filtered_ds1, filtered_ds2)) # type: ignore[arg-type] def test_parquet_dataset_filter(tempdir): @@ -5672,8 +5700,9 @@ def test_dataset_partition_with_slash(tmpdir): assert dt_table == read_table.sort_by("exp_id") exp_meta = dt_table.column(1).to_pylist() - exp_meta = sorted(set(exp_meta)) # take unique - encoded_paths = ["exp_meta=" + quote(path, safe='') for path in exp_meta] + exp_meta = sorted(set(exp_meta), key=lambda x: ( + x is None, x)) # take unique, handle None + encoded_paths = ["exp_meta=" + quote(str(path), safe='') for path in exp_meta] file_paths = sorted(os.listdir(path)) assert encoded_paths == file_paths @@ -5756,6 +5785,7 @@ def test_write_dataset_write_page_index(tempdir): ) ds1 = ds.dataset(base_dir, format="parquet") + assert isinstance(ds1, ds.FileSystemDataset) for file in ds1.files: # Can retrieve sorting columns from metadata metadata = pq.read_metadata(file) @@ -5898,13 +5928,13 @@ def test_make_write_options_error(): "'pyarrow._dataset_parquet.ParquetFileFormat' objects " "doesn't apply to a 'int'") with pytest.raises(TypeError) as excinfo: - pa.dataset.ParquetFileFormat.make_write_options(43) + pa.dataset.ParquetFileFormat.make_write_options(43) # type: ignore assert msg_1 in str(excinfo.value) or msg_2 in str(excinfo.value) pformat = pa.dataset.ParquetFileFormat() msg = "make_write_options\\(\\) takes exactly 0 positional arguments" with pytest.raises(TypeError, match=msg): - pformat.make_write_options(43) + pformat.make_write_options(43) # type: ignore def test_scanner_from_substrait(dataset): @@ -5939,3 +5969,4 @@ def test_scanner_from_substrait(dataset): filter=ps.BoundExpressions.from_substrait(filtering) ).to_table() assert result.to_pydict() == {'str': ['4', '4']} +# Type stubs fixes applied diff --git a/python/pyarrow/tests/test_dataset_encryption.py b/python/pyarrow/tests/test_dataset_encryption.py index 0ef3931a4cf6..3d6583523722 100644 --- a/python/pyarrow/tests/test_dataset_encryption.py +++ b/python/pyarrow/tests/test_dataset_encryption.py @@ -30,8 +30,8 @@ import pyarrow.parquet as pq import pyarrow.dataset as ds except ImportError: - pq = None - ds = None + pq = None # type: ignore[assignment] + ds = None # type: ignore[assignment] try: from pyarrow.tests.parquet.encryption import InMemoryKmsClient @@ -85,7 +85,7 @@ def create_encryption_config(footer_key=FOOTER_KEY_NAME, column_keys=COLUMN_KEYS def create_decryption_config(): - return pe.DecryptionConfiguration(cache_lifetime=300) + return pe.DecryptionConfiguration(cache_lifetime=timedelta(seconds=300)) def create_kms_connection_config(keys=KEYS): @@ -135,6 +135,8 @@ def assert_decrypts( encrypt_kms_connection_config = create_kms_connection_config(write_keys) decrypt_kms_connection_config = create_kms_connection_config(read_keys) + assert ds is not None + assert pe is not None crypto_factory = pe.CryptoFactory(kms_factory) parquet_encryption_cfg = ds.ParquetEncryptionConfig( crypto_factory, encrypt_kms_connection_config, encryption_config @@ -370,11 +372,12 @@ def test_large_row_encryption_decryption(): """Test encryption and decryption of a large number of rows.""" class NoOpKmsClient(pe.KmsClient): - def wrap_key(self, key_bytes: bytes, _: str) -> bytes: + def wrap_key(self, key_bytes: bytes, _: str) -> bytes: # type: ignore[override] b = base64.b64encode(key_bytes) return b - def unwrap_key(self, wrapped_key: bytes, _: str) -> bytes: + def unwrap_key(self, wrapped_key: bytes, _: str # type: ignore[override] + ) -> bytes: b = base64.b64decode(wrapped_key) return b @@ -395,6 +398,9 @@ def unwrap_key(self, wrapped_key: bytes, _: str) -> bytes: plaintext_footer=False, data_key_length_bits=128, ) + assert ds is not None + assert pe is not None + assert pq is not None pqe_config = ds.ParquetEncryptionConfig( crypto_factory, kms_config, encryption_config ) @@ -429,6 +435,9 @@ def unwrap_key(self, wrapped_key: bytes, _: str) -> bytes: encryption_unavailable, reason="Parquet Encryption is not currently enabled" ) def test_dataset_encryption_with_selected_column_statistics(): + assert ds is not None + assert pq is not None + table = create_sample_table() encryption_config = create_encryption_config() @@ -472,7 +481,7 @@ def test_dataset_encryption_with_selected_column_statistics(): for fragment in dataset.get_fragments(): decryption_properties = crypto_factory.file_decryption_properties( - kms_connection_config, decryption_config, fragment.path, mockfs) + kms_connection_config, decryption_config, fragment.path, mockfs) # type: ignore[call-arg] with pq.ParquetFile( fragment.path, decryption_properties=decryption_properties, @@ -481,12 +490,14 @@ def test_dataset_encryption_with_selected_column_statistics(): for rg_idx in range(parquet_file.metadata.num_row_groups): row_group = parquet_file.metadata.row_group(rg_idx) - assert row_group.column(0).statistics is not None - assert row_group.column(0).statistics.min == 2019 - assert row_group.column(0).statistics.max == 2022 + stats0 = row_group.column(0).statistics + assert stats0 is not None + assert stats0.min == 2019 + assert stats0.max == 2022 - assert row_group.column(1).statistics is not None - assert row_group.column(1).statistics.min == 2 - assert row_group.column(1).statistics.max == 100 + stats1 = row_group.column(1).statistics + assert stats1 is not None + assert stats1.min == 2 + assert stats1.max == 100 assert row_group.column(2).statistics is None