From 48b59e5988e1d4cb70d53682cc6c86f3f63291b8 Mon Sep 17 00:00:00 2001 From: Mohammad Linjawi Date: Tue, 28 Apr 2026 12:36:18 +0300 Subject: [PATCH 1/3] [VL][Delta] Add native Delta DV reader support --- cpp/velox/CMakeLists.txt | 7 +- cpp/velox/compute/VeloxBackend.cc | 7 + cpp/velox/compute/VeloxBackend.h | 4 + cpp/velox/compute/VeloxConnectorIds.h | 2 + cpp/velox/compute/VeloxRuntime.cc | 13 + cpp/velox/compute/WholeStageResultIterator.cc | 1 + cpp/velox/compute/delta/DeltaConnector.cpp | 48 ++++ cpp/velox/compute/delta/DeltaConnector.h | 70 +++++ cpp/velox/compute/delta/DeltaDataSource.cpp | 97 +++++++ cpp/velox/compute/delta/DeltaDataSource.h | 78 ++++++ .../delta/DeltaDeletionVectorReader.cpp | 209 ++++++++++++++ .../compute/delta/DeltaDeletionVectorReader.h | 106 +++++++ cpp/velox/compute/delta/DeltaSplit.cpp | 77 ++++++ cpp/velox/compute/delta/DeltaSplit.h | 151 ++++++++++ cpp/velox/compute/delta/DeltaSplitReader.cpp | 261 ++++++++++++++++++ cpp/velox/compute/delta/DeltaSplitReader.h | 119 ++++++++ cpp/velox/compute/delta/tests/CMakeLists.txt | 13 + .../delta/tests/DeltaConnectorTest.cpp | 86 ++++++ .../tests/DeltaDeletionVectorReaderTest.cpp | 219 +++++++++++++++ .../compute/delta/tests/DeltaSplitTest.cpp | 96 +++++++ 20 files changed, 1663 insertions(+), 1 deletion(-) create mode 100644 cpp/velox/compute/delta/DeltaConnector.cpp create mode 100644 cpp/velox/compute/delta/DeltaConnector.h create mode 100644 cpp/velox/compute/delta/DeltaDataSource.cpp create mode 100644 cpp/velox/compute/delta/DeltaDataSource.h create mode 100644 cpp/velox/compute/delta/DeltaDeletionVectorReader.cpp create mode 100644 cpp/velox/compute/delta/DeltaDeletionVectorReader.h create mode 100644 cpp/velox/compute/delta/DeltaSplit.cpp create mode 100644 cpp/velox/compute/delta/DeltaSplit.h create mode 100644 cpp/velox/compute/delta/DeltaSplitReader.cpp create mode 100644 cpp/velox/compute/delta/DeltaSplitReader.h create mode 100644 cpp/velox/compute/delta/tests/DeltaConnectorTest.cpp create mode 100644 cpp/velox/compute/delta/tests/DeltaDeletionVectorReaderTest.cpp create mode 100644 cpp/velox/compute/delta/tests/DeltaSplitTest.cpp diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index 5034c1601abc..ad2056877c81 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -159,6 +159,11 @@ set(VELOX_SRCS compute/VeloxRuntime.cc compute/VeloxPlanConverter.cc compute/WholeStageResultIterator.cc + compute/delta/DeltaConnector.cpp + compute/delta/DeltaDataSource.cpp + compute/delta/DeltaDeletionVectorReader.cpp + compute/delta/DeltaSplit.cpp + compute/delta/DeltaSplitReader.cpp compute/delta/RoaringBitmapArray.cpp compute/iceberg/IcebergPlanConverter.cc jni/JniFileSystem.cc @@ -390,8 +395,8 @@ find_package( target_link_libraries(velox PUBLIC ICU::i18n ICU::uc ICU::data) if(BUILD_TESTS) - add_subdirectory(tests) add_subdirectory(compute/delta/tests) + add_subdirectory(tests) endif() if(BUILD_BENCHMARKS) diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 85a8622508a1..801fc9d8358d 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -21,6 +21,7 @@ #include #include +#include "compute/delta/DeltaConnector.h" #include "operators/functions/RegistrationAllFunctions.h" #include "operators/plannodes/RowVectorStream.h" #include "utils/ConfigExtractor.h" @@ -323,6 +324,12 @@ std::shared_ptr VeloxBackend::createHiveC return std::make_shared(connectorId, hiveConnectorConfig_, ioExecutor); } +std::shared_ptr VeloxBackend::createDeltaConnector( + const std::string& connectorId, + folly::Executor* ioExecutor) const { + return std::make_shared(connectorId, hiveConnectorConfig_, ioExecutor); +} + std::shared_ptr VeloxBackend::createValueStreamConnector( const std::string& connectorId, bool dynamicFilterEnabled) const { diff --git a/cpp/velox/compute/VeloxBackend.h b/cpp/velox/compute/VeloxBackend.h index 68791ec0f995..2796ce20c3cc 100644 --- a/cpp/velox/compute/VeloxBackend.h +++ b/cpp/velox/compute/VeloxBackend.h @@ -74,6 +74,10 @@ class VeloxBackend { const std::string& connectorId, folly::Executor* ioExecutor) const; + std::shared_ptr createDeltaConnector( + const std::string& connectorId, + folly::Executor* ioExecutor) const; + std::shared_ptr createValueStreamConnector( const std::string& connectorId, bool dynamicFilterEnabled) const; diff --git a/cpp/velox/compute/VeloxConnectorIds.h b/cpp/velox/compute/VeloxConnectorIds.h index e6082bae8bdf..a0e37ba8b600 100644 --- a/cpp/velox/compute/VeloxConnectorIds.h +++ b/cpp/velox/compute/VeloxConnectorIds.h @@ -23,9 +23,11 @@ namespace gluten { struct VeloxConnectorIds { std::string hive; + std::string delta; std::string iterator; std::string cudfHive; bool hiveRegistered{false}; + bool deltaRegistered{false}; bool iteratorRegistered{false}; bool cudfHiveRegistered{false}; }; diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 1e2cc6f3082e..0fa35f5d74c0 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -31,6 +31,7 @@ #include "compute/ResultIterator.h" #include "compute/Runtime.h" #include "compute/VeloxPlanConverter.h" +#include "compute/delta/DeltaConnector.h" #include "config/VeloxConfig.h" #include "operators/plannodes/IteratorSplit.h" #include "operators/serializer/VeloxRowToColumnarConverter.h" @@ -213,6 +214,7 @@ std::string makeScopedConnectorId(const std::string& base, uint64_t runtimeId) { VeloxConnectorIds makeScopedConnectorIds(uint64_t runtimeId) { return VeloxConnectorIds{ .hive = makeScopedConnectorId(kHiveConnectorId, runtimeId), + .delta = makeScopedConnectorId(delta::DeltaConnectorFactory::kDeltaConnectorName, runtimeId), .iterator = makeScopedConnectorId(kIteratorConnectorId, runtimeId), .cudfHive = makeScopedConnectorId(kCudfHiveConnectorId, runtimeId)}; } @@ -271,6 +273,13 @@ void VeloxRuntime::registerConnectors() { velox::connector::hasConnector(connectorIds_.hive), "Scoped hive connector not found after registration: " + connectorIds_.hive); + connectorIds_.deltaRegistered = + velox::connector::registerConnector(backend->createDeltaConnector(connectorIds_.delta, ioExecutor_.get())); + GLUTEN_CHECK(connectorIds_.deltaRegistered, "Failed to register scoped delta connector: " + connectorIds_.delta); + GLUTEN_CHECK( + velox::connector::hasConnector(connectorIds_.delta), + "Scoped delta connector not found after registration: " + connectorIds_.delta); + const auto valueStreamDynamicFilterEnabled = veloxCfg_->get(kValueStreamDynamicFilterEnabled, kValueStreamDynamicFilterEnabledDefault); connectorIds_.iteratorRegistered = velox::connector::registerConnector( @@ -306,6 +315,10 @@ void VeloxRuntime::unregisterConnectors() { velox::connector::unregisterConnector(connectorIds_.iterator); connectorIds_.iteratorRegistered = false; } + if (connectorIds_.deltaRegistered) { + velox::connector::unregisterConnector(connectorIds_.delta); + connectorIds_.deltaRegistered = false; + } if (connectorIds_.hiveRegistered) { velox::connector::unregisterConnector(connectorIds_.hive); connectorIds_.hiveRegistered = false; diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 2c1effba20c3..ccc1917f417e 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -216,6 +216,7 @@ std::shared_ptr WholeStageResultIterator::createNewVeloxQ std::unordered_map> connectorConfigs; auto hiveSessionConfig = createHiveConnectorSessionConfig(veloxCfg_); connectorConfigs[connectorIds_.hive] = hiveSessionConfig; + connectorConfigs[connectorIds_.delta] = hiveSessionConfig; connectorConfigs[connectorIds_.iterator] = hiveSessionConfig; #ifdef GLUTEN_ENABLE_GPU if (!connectorIds_.cudfHive.empty()) { diff --git a/cpp/velox/compute/delta/DeltaConnector.cpp b/cpp/velox/compute/delta/DeltaConnector.cpp new file mode 100644 index 000000000000..f25eb7714a11 --- /dev/null +++ b/cpp/velox/compute/delta/DeltaConnector.cpp @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "compute/delta/DeltaConnector.h" + +#include "compute/delta/DeltaDataSource.h" + +namespace gluten::delta { + +std::unique_ptr DeltaConnector::createDataSource( + const RowTypePtr& outputType, + const ConnectorTableHandlePtr& tableHandle, + const ColumnHandleMap& columnHandles, + ConnectorQueryCtx* connectorQueryCtx) { + return std::make_unique( + outputType, tableHandle, columnHandles, &fileHandleFactory_, ioExecutor_, connectorQueryCtx, hiveConfig_); +} + +} // namespace gluten::delta diff --git a/cpp/velox/compute/delta/DeltaConnector.h b/cpp/velox/compute/delta/DeltaConnector.h new file mode 100644 index 000000000000..2d3a1d8df60c --- /dev/null +++ b/cpp/velox/compute/delta/DeltaConnector.h @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/connectors/hive/HiveConnector.h" + +namespace gluten::delta { + +using namespace facebook::velox; +using namespace facebook::velox::connector; +using namespace facebook::velox::connector::hive; + +class DeltaConnector final : public HiveConnector { + public: + DeltaConnector(const std::string& id, std::shared_ptr config, folly::Executor* ioExecutor) + : HiveConnector(id, std::move(config), ioExecutor) {} + + std::unique_ptr createDataSource( + const RowTypePtr& outputType, + const ConnectorTableHandlePtr& tableHandle, + const ColumnHandleMap& columnHandles, + ConnectorQueryCtx* connectorQueryCtx) override; +}; + +class DeltaConnectorFactory final : public ConnectorFactory { + public: + static constexpr const char* kDeltaConnectorName = "delta"; + + DeltaConnectorFactory() : ConnectorFactory(kDeltaConnectorName) {} + + std::shared_ptr newConnector( + const std::string& id, + std::shared_ptr config, + folly::Executor* ioExecutor = nullptr, + [[maybe_unused]] folly::Executor* cpuExecutor = nullptr) override { + return std::make_shared(id, std::move(config), ioExecutor); + } +}; + +} // namespace gluten::delta diff --git a/cpp/velox/compute/delta/DeltaDataSource.cpp b/cpp/velox/compute/delta/DeltaDataSource.cpp new file mode 100644 index 000000000000..e7ec73007b99 --- /dev/null +++ b/cpp/velox/compute/delta/DeltaDataSource.cpp @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "compute/delta/DeltaDataSource.h" + +#include "compute/delta/DeltaSplitReader.h" + +namespace gluten::delta { + +DeltaDataSource::DeltaDataSource( + const RowTypePtr& outputType, + const ConnectorTableHandlePtr& tableHandle, + const ColumnHandleMap& assignments, + FileHandleFactory* fileHandleFactory, + folly::Executor* ioExecutor, + const ConnectorQueryCtx* connectorQueryCtx, + const std::shared_ptr& hiveConfig) + : HiveDataSource( + outputType, + tableHandle, + assignments, + fileHandleFactory, + ioExecutor, + connectorQueryCtx, + hiveConfig) {} + +#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER +std::unique_ptr DeltaDataSource::createSplitReader() { + auto bucketChannels = prepareSplit(); + auto deltaSplit = checkedPointerCast(split_); + + return std::make_unique( + deltaSplit, + tableHandle_, + &partitionKeys_, + connectorQueryCtx_, + fileConfig_, + readerOutputType_, + ioStatistics_, + ioStats_, + fileHandleFactory_, + ioExecutor_, + scanSpec_, + &infoColumns_, + std::move(bucketChannels), + /*subfieldFiltersForValidation=*/getFilters()); +} +#else +std::unique_ptr DeltaDataSource::createSplitReader() { + auto deltaSplit = checkedPointerCast(split_); + + return std::make_unique( + deltaSplit, + hiveTableHandle_, + &partitionKeys_, + connectorQueryCtx_, + hiveConfig_, + readerOutputType_, + ioStatistics_, + ioStats_, + fileHandleFactory_, + ioExecutor_, + scanSpec_, + /*subfieldFiltersForValidation=*/getFilters()); +} +#endif + +} // namespace gluten::delta diff --git a/cpp/velox/compute/delta/DeltaDataSource.h b/cpp/velox/compute/delta/DeltaDataSource.h new file mode 100644 index 000000000000..a9fe1a47534b --- /dev/null +++ b/cpp/velox/compute/delta/DeltaDataSource.h @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/connectors/hive/HiveDataSource.h" + +#ifndef GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER +#if __has_include("velox/connectors/hive/FileSplitReader.h") +#define GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER 1 +#else +#define GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER 0 +#endif +#endif + +#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER +#include "velox/connectors/hive/FileSplitReader.h" +#else +namespace facebook::velox::connector::hive { +class SplitReader; +} +#endif + +namespace gluten::delta { + +using namespace facebook::velox; +using namespace facebook::velox::connector; +using namespace facebook::velox::connector::hive; + +class DeltaDataSource : public HiveDataSource { + public: + DeltaDataSource( + const RowTypePtr& outputType, + const ConnectorTableHandlePtr& tableHandle, + const ColumnHandleMap& assignments, + FileHandleFactory* fileHandleFactory, + folly::Executor* ioExecutor, + const ConnectorQueryCtx* connectorQueryCtx, + const std::shared_ptr& hiveConfig); + + protected: +#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER + std::unique_ptr createSplitReader() override; +#else + std::unique_ptr createSplitReader() override; +#endif +}; + +} // namespace gluten::delta diff --git a/cpp/velox/compute/delta/DeltaDeletionVectorReader.cpp b/cpp/velox/compute/delta/DeltaDeletionVectorReader.cpp new file mode 100644 index 000000000000..a48060d06061 --- /dev/null +++ b/cpp/velox/compute/delta/DeltaDeletionVectorReader.cpp @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "compute/delta/DeltaDeletionVectorReader.h" + +#include +#include "velox/common/base/BitUtil.h" +#include "velox/common/base/Exceptions.h" + +namespace gluten::delta { + +namespace { + +constexpr uint64_t kDeltaBitmapArrayMagicBytes = 4; +constexpr uint64_t kDeltaNativeBitmapArrayLengthBytes = 4; +constexpr uint64_t kDeltaStoredPayloadLengthBytes = 4; +constexpr uint32_t kDeltaPortableBitmapArrayMagicNumber = 1681511377; +constexpr uint32_t kDeltaNativeBitmapArrayMagicNumber = 1681511376; + +uint32_t readUint32LittleEndian(const char* data) { + const auto* bytes = reinterpret_cast(data); + return static_cast(bytes[0]) | (static_cast(bytes[1]) << 8) | + (static_cast(bytes[2]) << 16) | (static_cast(bytes[3]) << 24); +} + +uint64_t readUint64LittleEndian(const char* data) { + const auto* bytes = reinterpret_cast(data); + return static_cast(bytes[0]) | (static_cast(bytes[1]) << 8) | + (static_cast(bytes[2]) << 16) | (static_cast(bytes[3]) << 24) | + (static_cast(bytes[4]) << 32) | (static_cast(bytes[5]) << 40) | + (static_cast(bytes[6]) << 48) | (static_cast(bytes[7]) << 56); +} + +roaring::Roaring64Map deserializeDeltaBitmapArray(std::string_view serializedPayload, const std::string& dvPath) { + VELOX_USER_CHECK_GE( + serializedPayload.size(), + kDeltaBitmapArrayMagicBytes, + "Deletion vector payload is too small for Delta bitmap array: {}", + dvPath); + + const auto magic = readUint32LittleEndian(serializedPayload.data()); + if (magic == kDeltaPortableBitmapArrayMagicNumber) { + const auto portablePayload = serializedPayload.substr(kDeltaBitmapArrayMagicBytes); + return roaring::Roaring64Map::readSafe(portablePayload.data(), portablePayload.size()); + } + + if (magic == kDeltaNativeBitmapArrayMagicNumber) { + VELOX_USER_CHECK_GE( + serializedPayload.size(), + kDeltaBitmapArrayMagicBytes + kDeltaNativeBitmapArrayLengthBytes, + "Deletion vector payload is too small for Delta native bitmap array: {}", + dvPath); + + const auto bitmapCount = readUint32LittleEndian(serializedPayload.data() + kDeltaBitmapArrayMagicBytes); + size_t offset = kDeltaBitmapArrayMagicBytes + kDeltaNativeBitmapArrayLengthBytes; + roaring::Roaring64Map result; + + for (uint64_t bitmapIndex = 0; bitmapIndex < bitmapCount; ++bitmapIndex) { + VELOX_USER_CHECK_LE( + offset + kDeltaStoredPayloadLengthBytes, + serializedPayload.size(), + "Deletion vector payload ended before bitmap {} size for {}", + bitmapIndex, + dvPath); + + const auto bitmapSize = readUint32LittleEndian(serializedPayload.data() + offset); + offset += kDeltaStoredPayloadLengthBytes; + + VELOX_USER_CHECK_LE( + offset + bitmapSize, + serializedPayload.size(), + "Deletion vector bitmap {} range exceeds payload size for {}", + bitmapIndex, + dvPath); + + auto bitmap = roaring::Roaring::readSafe(serializedPayload.data() + offset, bitmapSize); + VELOX_USER_CHECK_EQ( + bitmap.getSizeInBytes(true), + bitmapSize, + "Deletion vector bitmap {} size mismatch for {}: expected {}, got {}", + bitmapIndex, + dvPath, + bitmapSize, + bitmap.getSizeInBytes(true)); + + const uint64_t rowBase = bitmapIndex << 32; + for (auto it = bitmap.begin(); it != bitmap.end(); ++it) { + result.add(rowBase | static_cast(*it)); + } + offset += bitmapSize; + } + + VELOX_USER_CHECK_EQ( + offset, + serializedPayload.size(), + "Deletion vector payload has {} unexpected trailing bytes for {}", + serializedPayload.size() - offset, + dvPath); + return result; + } + + VELOX_USER_FAIL("Unexpected Delta bitmap array magic number {} for {}", magic, dvPath); +} + +} // namespace + +void DeltaDeletionVectorReader::loadSerializedDeletionVectorInternal( + std::string_view serializedPayload, + const std::string& debugName, + std::optional expectedCardinality) { + VELOX_USER_CHECK_GT(serializedPayload.size(), 0, "Serialized deletion vector is empty: {}", debugName); + + deletionBitmap_ = deserializeDeltaBitmapArray(serializedPayload, debugName); + + if (expectedCardinality.has_value()) { + const auto actualCardinality = deletionBitmap_->cardinality(); + VELOX_USER_CHECK_EQ( + actualCardinality, + expectedCardinality.value(), + "Deletion vector cardinality mismatch for {}: expected {}, got {}", + debugName, + expectedCardinality.value(), + actualCardinality); + } +} + +void DeltaDeletionVectorReader::loadSerializedDeletionVector( + std::string_view serializedPayload, + std::optional expectedCardinality) { + try { + loadSerializedDeletionVectorInternal(serializedPayload, "serialized deletion vector", expectedCardinality); + } catch (const std::exception& e) { + VELOX_USER_FAIL("Failed to load serialized deletion vector: {}", e.what()); + } +} + +bool DeltaDeletionVectorReader::isRowDeleted(uint64_t rowPosition) { + if (!deletionBitmap_.has_value()) { + return false; + } + + return deletionBitmap_->contains(rowPosition); +} + +void DeltaDeletionVectorReader::applyDeletionFilter(uint64_t baseReadOffset, uint64_t size, BufferPtr deleteBitmap) { + VELOX_CHECK_NOT_NULL(deleteBitmap, "Delete bitmap buffer is required"); + + if (!deletionBitmap_.has_value()) { + std::memset(deleteBitmap->asMutable(), 0, bits::nbytes(size)); + deleteBitmap->setSize(0); + return; + } + + auto* rawBitmap = deleteBitmap->asMutable(); + std::memset(rawBitmap, 0, bits::nbytes(size)); + + bool hasDeletedRows = false; + uint64_t highestDeletedIndex = 0; + for (uint64_t i = 0; i < size; ++i) { + const uint64_t absoluteRowPos = baseReadOffset + i; + if (deletionBitmap_->contains(absoluteRowPos)) { + bits::setBit(rawBitmap, i); + hasDeletedRows = true; + highestDeletedIndex = i; + } + } + + deleteBitmap->setSize(hasDeletedRows ? bits::nbytes(highestDeletedIndex + 1) : 0); +} + +uint64_t DeltaDeletionVectorReader::estimatedDeletedRowCount() const { + if (!deletionBitmap_.has_value()) { + return 0; + } + + // Return actual cardinality instead of estimated size + return deletionBitmap_->cardinality(); +} + +} // namespace gluten::delta diff --git a/cpp/velox/compute/delta/DeltaDeletionVectorReader.h b/cpp/velox/compute/delta/DeltaDeletionVectorReader.h new file mode 100644 index 000000000000..f6ae7d701728 --- /dev/null +++ b/cpp/velox/compute/delta/DeltaDeletionVectorReader.h @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/base/BitUtil.h" +#include "velox/vector/ComplexVector.h" + +#include +#include +#include +#include +#include + +namespace gluten::delta { + +using namespace facebook::velox; + +/// Reads and manages Delta Lake deletion vectors for filtering deleted rows +/// during table scans. +/// +/// The JVM Delta side materializes the deletion vector and hands the serialized +/// bitmap payload to native. This reader only deserializes that payload and +/// applies row filtering during scan. +/// +/// Usage example: +/// @code +/// auto reader = std::make_unique(); +/// reader->loadSerializedDeletionVector(serializedPayload, expectedCardinality); +/// if (reader->isRowDeleted(42)) { +/// // Skip this row during scan +/// } +/// @endcode +class DeltaDeletionVectorReader { + public: + DeltaDeletionVectorReader() = default; + + /// Loads a deletion vector from an already decoded serialized Delta payload. + void loadSerializedDeletionVector( + std::string_view serializedPayload, + std::optional expectedCardinality = std::nullopt); + + /// Checks if a specific row position is marked as deleted. + /// Note: This method is not const because it may update internal caching + /// state. + /// @param rowPosition 0-based row position in the data file + /// @return true if the row is deleted, false otherwise + bool isRowDeleted(uint64_t rowPosition); + + /// Applies deletion filter to a batch of rows, updating the deletion bitmap. + /// This is called during scan to mark deleted rows in the output bitmap. + /// @param baseReadOffset Starting row position for this batch (absolute) + /// @param size Number of rows in the batch + /// @param deleteBitmap Output bitmap marking deleted rows (1 = deleted, 0 = + /// keep) + void applyDeletionFilter(uint64_t baseReadOffset, uint64_t size, BufferPtr deleteBitmap); + + /// Returns true if no deletion vector is loaded. + bool empty() const { + return !deletionBitmap_.has_value(); + } + + /// Returns the approximate number of deleted rows in the loaded DV. + /// Note: This is an approximation based on bitmap size. + uint64_t estimatedDeletedRowCount() const; + + private: + void loadSerializedDeletionVectorInternal( + std::string_view serializedPayload, + const std::string& debugName, + std::optional expectedCardinality); + + // The loaded deletion vector bitmap + std::optional deletionBitmap_; +}; + +} // namespace gluten::delta diff --git a/cpp/velox/compute/delta/DeltaSplit.cpp b/cpp/velox/compute/delta/DeltaSplit.cpp new file mode 100644 index 000000000000..4e0d119b1c02 --- /dev/null +++ b/cpp/velox/compute/delta/DeltaSplit.cpp @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "compute/delta/DeltaSplit.h" + +namespace gluten::delta { + +HiveDeltaSplit::HiveDeltaSplit( + const std::string& connectorId, + const std::string& filePath, + dwio::common::FileFormat fileFormat, + uint64_t start, + uint64_t length, + const std::unordered_map>& partitionKeys, + std::optional tableBucketNumber, + const std::unordered_map& customSplitInfo, + const std::shared_ptr& extraFileInfo, + const std::unordered_map& serdeParameters, + bool cacheable, + std::optional deletionVector, + std::optional protocolInfo, + std::optional statistics, + DeltaRowIndexFilterType filterType, + const std::unordered_map& infoColumns, + std::optional fileProperties) + : HiveConnectorSplit( + connectorId, + filePath, + fileFormat, + start, + length, + partitionKeys, + tableBucketNumber, + customSplitInfo, + extraFileInfo, + serdeParameters, + /*splitWeight=*/0, + cacheable, + infoColumns, + fileProperties, + std::nullopt, + std::nullopt), + deletionVector(std::move(deletionVector)), + protocolInfo(std::move(protocolInfo)), + statistics(std::move(statistics)), + filterType(filterType) {} + +} // namespace gluten::delta diff --git a/cpp/velox/compute/delta/DeltaSplit.h b/cpp/velox/compute/delta/DeltaSplit.h new file mode 100644 index 000000000000..c1e34e27a9bd --- /dev/null +++ b/cpp/velox/compute/delta/DeltaSplit.h @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "compute/Runtime.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" + +namespace gluten::delta { + +using namespace facebook::velox; +using namespace facebook::velox::connector; +using namespace facebook::velox::connector::hive; + +enum class DeltaRowIndexFilterType { + kKeepAll, + kIfContained, + kIfNotContained, +}; + +/// Protocol version information for a Delta table. +/// Used to validate that the table supports deletion vectors. +/// Per Delta spec: DVs require Reader v3+ and Writer v7+ with +/// 'deletionVectors' feature flag. +struct DeltaProtocolInfo { + int32_t minReaderVersion; + int32_t minWriterVersion; + std::optional> readerFeatures; + std::optional> writerFeatures; + + /// Check if this protocol supports deletion vectors. + /// Returns true if: + /// - minReaderVersion >= 3 + /// - minWriterVersion >= 7 + /// - 'deletionVectors' is in readerFeatures + bool supportsDeletionVectors() const { + if (minReaderVersion < 3 || minWriterVersion < 7) { + return false; + } + if (!readerFeatures.has_value()) { + return false; + } + return std::find(readerFeatures->begin(), readerFeatures->end(), "deletionVectors") != readerFeatures->end(); + } +}; + +struct DeltaDeletionVectorDescriptor { + std::optional cardinality; + std::optional serializedPayloadView; + + static DeltaDeletionVectorDescriptor serialized( + std::optional cardinality = std::nullopt, + std::optional serializedPayloadView = std::nullopt) { + return {cardinality, serializedPayloadView}; + } + + bool hasMaterializedPayload() const { + return serializedPayloadView.has_value(); + } +}; + +/// File-level statistics for a Delta data file. +/// Used to validate consistency with deletion vectors and +/// calculate logical row counts. +struct DeltaFileStatistics { + /// Physical number of rows in the Parquet file. + /// Required when deletion vector is present (per Delta spec). + std::optional numRecords; + + /// Whether column statistics (min/max) are tight bounds. + /// - true: min/max values exist in the valid (non-deleted) rows + /// - false: min/max are bounds only, may not exist in valid rows + /// When false with a DV, statistics may be stale and unsuitable + /// for aggregations like max(column). + std::optional tightBounds; + + /// Calculate the logical row count accounting for deletion vectors. + /// Returns the number of valid (non-deleted) rows. + /// Returns -1 if numRecords is not available. + int64_t logicalRowCount(const std::optional& dv) const { + if (!numRecords.has_value()) { + return -1; // Unknown + } + if (!dv.has_value() || !dv->cardinality.has_value()) { + return *numRecords; // No deletions + } + return *numRecords - static_cast(*dv->cardinality); + } +}; + +struct HiveDeltaSplit : public connector::hive::HiveConnectorSplit { + std::optional deletionVector; + std::optional protocolInfo; + std::optional statistics; + DeltaRowIndexFilterType filterType; + + HiveDeltaSplit( + const std::string& connectorId, + const std::string& filePath, + dwio::common::FileFormat fileFormat, + uint64_t start = 0, + uint64_t length = std::numeric_limits::max(), + const std::unordered_map>& partitionKeys = {}, + std::optional tableBucketNumber = std::nullopt, + const std::unordered_map& customSplitInfo = {}, + const std::shared_ptr& extraFileInfo = {}, + const std::unordered_map& serdeParameters = {}, + bool cacheable = true, + std::optional deletionVector = std::nullopt, + std::optional protocolInfo = std::nullopt, + std::optional statistics = std::nullopt, + DeltaRowIndexFilterType filterType = DeltaRowIndexFilterType::kKeepAll, + const std::unordered_map& infoColumns = {}, + std::optional fileProperties = std::nullopt); +}; + +} // namespace gluten::delta diff --git a/cpp/velox/compute/delta/DeltaSplitReader.cpp b/cpp/velox/compute/delta/DeltaSplitReader.cpp new file mode 100644 index 000000000000..1b34a5cf4280 --- /dev/null +++ b/cpp/velox/compute/delta/DeltaSplitReader.cpp @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "compute/delta/DeltaSplitReader.h" + +#include +#include + +#include "compute/delta/DeltaSplit.h" +#include "velox/connectors/hive/HiveConfig.h" +#include "velox/dwio/common/BufferUtil.h" + +using namespace facebook::velox::dwio::common; + +namespace gluten::delta { + +DeltaSplitReader::DeltaSplitReader( + const std::shared_ptr& hiveSplit, + const DeltaTableHandlePtr& tableHandle, + const DeltaColumnHandleMap* partitionKeys, + const ConnectorQueryCtx* connectorQueryCtx, + const std::shared_ptr& fileConfig, + const RowTypePtr& readerOutputType, + const std::shared_ptr& ioStatistics, + const std::shared_ptr& ioStats, + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + const std::shared_ptr& scanSpec, +#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER + const std::unordered_map* infoColumns, + std::vector bucketChannels, +#endif + const common::SubfieldFilters* subfieldFiltersForValidation) +#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER + : HiveSplitReader( + hiveSplit, + tableHandle, + partitionKeys, + connectorQueryCtx, + fileConfig, + readerOutputType, + ioStatistics, + ioStats, + fileHandleFactory, + executor, + scanSpec, + infoColumns, + std::move(bucketChannels), + subfieldFiltersForValidation), +#else + : SplitReader( + hiveSplit, + tableHandle, + partitionKeys, + connectorQueryCtx, + fileConfig, + readerOutputType, + ioStatistics, + ioStats, + fileHandleFactory, + executor, + scanSpec, + subfieldFiltersForValidation), +#endif + baseReadRowNumber_(0), + deleteBitmap_(nullptr) { +} + +void DeltaSplitReader::prepareSplit( + std::shared_ptr metadataFilter, + dwio::common::RuntimeStatistics& runtimeStats, + const folly::F14FastMap& fileReadOps) { +#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER + HiveSplitReader::prepareSplit(std::move(metadataFilter), runtimeStats, fileReadOps); + if (emptySplit() || !baseRowReader_) { + return; + } +#else + SplitReader::prepareSplit(std::move(metadataFilter), runtimeStats, fileReadOps); + if (emptySplit_ || !baseRowReader_) { + return; + } +#endif + + baseReadRowNumber_ = 0; + deleteBitmap_.reset(); + deletionVectorReader_.reset(); + + auto deltaSplit = checkedPointerCast(hiveSplit_); + if (!deltaSplit->deletionVector.has_value()) { + return; + } + + const auto& descriptor = *deltaSplit->deletionVector; + + // Extract and validate protocol from Hadoop configuration (passed via fileReadOps) + auto readerVersionIt = fileReadOps.find("spark.gluten.delta.protocol.reader.version"); + auto writerVersionIt = fileReadOps.find("spark.gluten.delta.protocol.writer.version"); + + if (readerVersionIt != fileReadOps.end() && writerVersionIt != fileReadOps.end()) { + DeltaProtocolInfo protocolInfo; + protocolInfo.minReaderVersion = std::stoi(readerVersionIt->second); + protocolInfo.minWriterVersion = std::stoi(writerVersionIt->second); + + // Extract reader features if present + auto readerFeaturesIt = fileReadOps.find("spark.gluten.delta.protocol.reader.features"); + if (readerFeaturesIt != fileReadOps.end() && !readerFeaturesIt->second.empty()) { + std::vector features; + std::istringstream iss(readerFeaturesIt->second); + std::string feature; + while (std::getline(iss, feature, ',')) { + features.push_back(feature); + } + protocolInfo.readerFeatures = std::move(features); + } + + // Extract writer features if present + auto writerFeaturesIt = fileReadOps.find("spark.gluten.delta.protocol.writer.features"); + if (writerFeaturesIt != fileReadOps.end() && !writerFeaturesIt->second.empty()) { + std::vector features; + std::istringstream iss(writerFeaturesIt->second); + std::string feature; + while (std::getline(iss, feature, ',')) { + features.push_back(feature); + } + protocolInfo.writerFeatures = std::move(features); + } + + // Validate protocol for deletion vectors + validateProtocolForDeletionVectors(protocolInfo); + } + + // Validate protocol if also provided in split (backward compatibility) + if (deltaSplit->protocolInfo.has_value()) { + validateProtocolForDeletionVectors(*deltaSplit->protocolInfo); + } + + // Validate statistics if provided + if (deltaSplit->statistics.has_value()) { + validateStatisticsForDeletionVectors(*deltaSplit->statistics, descriptor); + } + + VELOX_USER_CHECK( + descriptor.hasMaterializedPayload(), + "Delta deletion vector payload was not materialized on the JVM side for split {}", + hiveSplit_->filePath); + + deletionVectorReader_ = std::make_unique(); + const auto& payloadView = descriptor.serializedPayloadView.value(); + deletionVectorReader_->loadSerializedDeletionVector( + std::string_view(reinterpret_cast(payloadView.data), payloadView.size), descriptor.cardinality); +} + +uint64_t DeltaSplitReader::next(uint64_t size, VectorPtr& output) { + Mutation mutation; + mutation.randomSkip = baseReaderOpts_.randomSkip().get(); + mutation.deletedRows = nullptr; + + const auto actualSize = baseRowReader_->nextReadSize(size); + baseReadRowNumber_ = baseRowReader_->nextRowNumber(); + if (actualSize == RowReader::kAtEnd) { + return 0; + } + + const auto deltaSplit = checkedPointerCast(hiveSplit_); + if (deletionVectorReader_ && !deletionVectorReader_->empty()) { + const auto numBytes = bits::nbytes(actualSize); + ensureCapacity(deleteBitmap_, numBytes, connectorQueryCtx_->memoryPool(), false, true); + deleteBitmap_->setSize(numBytes); + deletionVectorReader_->applyDeletionFilter(baseReadRowNumber_, actualSize, deleteBitmap_); + if (deltaSplit->filterType == DeltaRowIndexFilterType::kIfNotContained) { + bits::negate(deleteBitmap_->asMutable(), actualSize); + deleteBitmap_->setSize(numBytes); + } + } else if (deleteBitmap_) { + deleteBitmap_->setSize(0); + } + + mutation.deletedRows = deleteBitmap_ && deleteBitmap_->size() > 0 ? deleteBitmap_->as() : nullptr; + + auto rowsScanned = baseRowReader_->next(actualSize, output, &mutation); + if (rowsScanned > 0 && output->size() > 0 && !bucketChannels().empty()) { + applyBucketConversion(output, bucketConversionRows(*output->asChecked())); + } + return rowsScanned; +} + +void DeltaSplitReader::validateProtocolForDeletionVectors(const DeltaProtocolInfo& protocol) { + if (!protocol.supportsDeletionVectors()) { + std::string readerFeatures = "none"; + if (protocol.readerFeatures.has_value()) { + readerFeatures = folly::join(", ", *protocol.readerFeatures); + } + + VELOX_USER_FAIL( + "Deletion vectors require reader protocol version 3+ and writer " + "protocol version 7+ with 'deletionVectors' feature enabled. " + "Found: minReaderVersion={}, minWriterVersion={}, readerFeatures=[{}]", + protocol.minReaderVersion, + protocol.minWriterVersion, + readerFeatures); + } +} + +void DeltaSplitReader::validateStatisticsForDeletionVectors( + const DeltaFileStatistics& stats, + const DeltaDeletionVectorDescriptor& dv) { + // Per Delta spec: numRecords is required when DV is present + if (!stats.numRecords.has_value()) { + VELOX_USER_FAIL( + "File statistics must include numRecords when deletion vector " + "is present. This is required by the Delta Lake protocol."); + } + + // Validate cardinality doesn't exceed numRecords + if (dv.cardinality.has_value() && static_cast(*dv.cardinality) > *stats.numRecords) { + VELOX_USER_FAIL( + "Deletion vector cardinality ({}) exceeds file numRecords ({}). " + "This indicates data corruption or an invalid deletion vector.", + *dv.cardinality, + *stats.numRecords); + } + + // Log warning if tightBounds is false (statistics may be stale) + if (stats.tightBounds.has_value() && !*stats.tightBounds) { + LOG(WARNING) << "File has deletion vector with loose bounds (tightBounds=false). " + << "Column statistics (min/max) may not be accurate for aggregations. " + << "Consider running OPTIMIZE to compact the deletion vector."; + } +} + +} // namespace gluten::delta diff --git a/cpp/velox/compute/delta/DeltaSplitReader.h b/cpp/velox/compute/delta/DeltaSplitReader.h new file mode 100644 index 000000000000..e1c02d06c814 --- /dev/null +++ b/cpp/velox/compute/delta/DeltaSplitReader.h @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "compute/delta/DeltaDeletionVectorReader.h" +#include "compute/delta/DeltaSplit.h" + +#ifndef GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER +#if __has_include("velox/connectors/hive/FileSplitReader.h") +#define GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER 1 +#else +#define GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER 0 +#endif +#endif + +#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER +#include "velox/connectors/hive/HiveSplitReader.h" +#elif __has_include("velox/connectors/hive/SplitReader.h") +#include "velox/connectors/hive/SplitReader.h" +#else +#include "velox/connectors/hive/HiveDataSource.h" +#endif +#include "velox/connectors/hive/TableHandle.h" + +namespace gluten::delta { + +using namespace facebook::velox; +using namespace facebook::velox::connector; +using namespace facebook::velox::connector::hive; + +#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER +using DeltaSplitReaderBase = HiveSplitReader; +using DeltaConfig = FileConfig; +using DeltaTableHandlePtr = FileTableHandlePtr; +using DeltaColumnHandleMap = std::unordered_map; +#else +using DeltaSplitReaderBase = SplitReader; +using DeltaConfig = HiveConfig; +using DeltaTableHandlePtr = HiveTableHandlePtr; +using DeltaColumnHandleMap = HiveColumnHandleMap; +#endif + +class DeltaSplitReader : public DeltaSplitReaderBase { + public: + DeltaSplitReader( + const std::shared_ptr& hiveSplit, + const DeltaTableHandlePtr& tableHandle, + const DeltaColumnHandleMap* partitionKeys, + const ConnectorQueryCtx* connectorQueryCtx, + const std::shared_ptr& fileConfig, + const RowTypePtr& readerOutputType, + const std::shared_ptr& ioStatistics, + const std::shared_ptr& ioStats, + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + const std::shared_ptr& scanSpec, +#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER + const std::unordered_map* infoColumns, + std::vector bucketChannels = {}, +#endif + const common::SubfieldFilters* subfieldFiltersForValidation = nullptr); + + void prepareSplit( + std::shared_ptr metadataFilter, + dwio::common::RuntimeStatistics& runtimeStats, + const folly::F14FastMap& fileReadOps = {}) override; + + uint64_t next(uint64_t size, VectorPtr& output) override; + + private: + /// Validate that the protocol supports deletion vectors. + /// Throws if protocol version is too low or feature flag is missing. + /// This is defense-in-depth validation - Gluten should already validate + /// at table level, but we check again at split level for safety. + void validateProtocolForDeletionVectors(const DeltaProtocolInfo& protocol); + + /// Validate that file statistics are consistent with deletion vector. + /// Per Delta spec: numRecords is required when DV is present. + /// Also validates that cardinality doesn't exceed numRecords. + void validateStatisticsForDeletionVectors(const DeltaFileStatistics& stats, const DeltaDeletionVectorDescriptor& dv); + + // Delta deletion vectors use file-global row positions, not split-relative + // row numbers. + uint64_t baseReadRowNumber_; + std::unique_ptr deletionVectorReader_; + BufferPtr deleteBitmap_; +}; + +} // namespace gluten::delta diff --git a/cpp/velox/compute/delta/tests/CMakeLists.txt b/cpp/velox/compute/delta/tests/CMakeLists.txt index ec86cab0d714..f501eff0d300 100644 --- a/cpp/velox/compute/delta/tests/CMakeLists.txt +++ b/cpp/velox/compute/delta/tests/CMakeLists.txt @@ -22,3 +22,16 @@ add_test( NAME velox_roaring_bitmap_array_test COMMAND velox_roaring_bitmap_array_test WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) + +add_executable( + velox_delta_read_test DeltaConnectorTest.cpp + DeltaDeletionVectorReaderTest.cpp DeltaSplitTest.cpp) + +target_link_libraries( + velox_delta_read_test velox roaring facebook::velox::exec_test_lib + GTest::gtest GTest::gtest_main) + +add_test( + NAME velox_delta_read_test + COMMAND velox_delta_read_test + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) diff --git a/cpp/velox/compute/delta/tests/DeltaConnectorTest.cpp b/cpp/velox/compute/delta/tests/DeltaConnectorTest.cpp new file mode 100644 index 000000000000..924142607faa --- /dev/null +++ b/cpp/velox/compute/delta/tests/DeltaConnectorTest.cpp @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "compute/delta/DeltaConnector.h" +#include "velox/connectors/Connector.h" +#include "velox/connectors/hive/HiveConfig.h" + +namespace gluten::delta { + +namespace { + +class DeltaConnectorTest : public ::testing::Test { + protected: + static constexpr const char* kConnectorId = "test-delta"; + + void TearDown() override { + unregisterConnector(kConnectorId); + } + + void registerDeltaConnector( + std::shared_ptr config = + std::make_shared(std::unordered_map{})) { + unregisterConnector(kConnectorId); + + DeltaConnectorFactory factory; + registerConnector(factory.newConnector(kConnectorId, std::move(config))); + } +}; + +TEST_F(DeltaConnectorTest, connectorConfiguration) { + auto customConfig = std::make_shared(std::unordered_map{ + {hive::HiveConfig::kEnableFileHandleCache, "true"}, {hive::HiveConfig::kNumCacheFileHandles, "1000"}}); + + registerDeltaConnector(customConfig); + + auto deltaConnector = getConnector(kConnectorId); + ASSERT_NE(deltaConnector, nullptr); + + hive::HiveConfig hiveConfig(deltaConnector->connectorConfig()); + ASSERT_TRUE(hiveConfig.isFileHandleCacheEnabled()); + ASSERT_EQ(hiveConfig.numCacheFileHandles(), 1000); +} + +TEST_F(DeltaConnectorTest, connectorProperties) { + registerDeltaConnector(); + + auto deltaConnector = getConnector(kConnectorId); + ASSERT_NE(deltaConnector, nullptr); + ASSERT_TRUE(deltaConnector->canAddDynamicFilter()); + ASSERT_TRUE(deltaConnector->supportsSplitPreload()); +} + +} // namespace + +} // namespace gluten::delta diff --git a/cpp/velox/compute/delta/tests/DeltaDeletionVectorReaderTest.cpp b/cpp/velox/compute/delta/tests/DeltaDeletionVectorReaderTest.cpp new file mode 100644 index 000000000000..f214e427e1a7 --- /dev/null +++ b/cpp/velox/compute/delta/tests/DeltaDeletionVectorReaderTest.cpp @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "compute/delta/DeltaDeletionVectorReader.h" +#include "compute/delta/RoaringBitmapArray.h" +#include "velox/common/base/tests/GTestUtils.h" + +#include + +using namespace facebook::velox; +using namespace gluten::delta; + +class DeltaDeletionVectorReaderTest : public ::testing::Test { + protected: + static void SetUpTestSuite() { + memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{}); + } + + void SetUp() override { + pool_ = memory::memoryManager()->addLeafPool(); + } + + std::string createSerializedPayload(const std::vector& deletedRows) { + RoaringBitmapArray bitmap; + for (auto row : deletedRows) { + bitmap.addSafe(row); + } + + const auto serializedSize = bitmap.serializedSizeInBytes(); + auto buffer = AlignedBuffer::allocate(serializedSize, pool_.get()); + bitmap.serialize(buffer->asMutable()); + return std::string(buffer->as(), serializedSize); + } + + std::shared_ptr pool_; +}; + +TEST_F(DeltaDeletionVectorReaderTest, LoadSerializedPayload) { + auto payload = createSerializedPayload({2, 7, 12}); + + DeltaDeletionVectorReader reader; + reader.loadSerializedDeletionVector(payload, 3); + + EXPECT_TRUE(reader.isRowDeleted(2)); + EXPECT_TRUE(reader.isRowDeleted(7)); + EXPECT_TRUE(reader.isRowDeleted(12)); + EXPECT_FALSE(reader.isRowDeleted(0)); + EXPECT_FALSE(reader.isRowDeleted(3)); + EXPECT_FALSE(reader.isRowDeleted(20)); +} + +TEST_F(DeltaDeletionVectorReaderTest, LoadPortablePayload) { + // Captured from a Delta 3.3.2 table after `DELETE WHERE id < 10`. + const std::vector payloadBytes = {0xd1, 0xd3, 0x39, 0x64, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x3b, 0x30, 0x00, 0x00, 0x01, 0x00, + 0x00, 0x09, 0x00, 0x01, 0x00, 0x00, 0x00, 0x09, 0x00}; + + DeltaDeletionVectorReader reader; + reader.loadSerializedDeletionVector( + std::string_view(reinterpret_cast(payloadBytes.data()), payloadBytes.size()), 10); + + for (uint64_t deleted = 0; deleted < 10; ++deleted) { + EXPECT_TRUE(reader.isRowDeleted(deleted)); + } + EXPECT_FALSE(reader.isRowDeleted(10)); + EXPECT_FALSE(reader.isRowDeleted(100)); +} + +TEST_F(DeltaDeletionVectorReaderTest, ApplyDeletionFilter) { + auto payload = createSerializedPayload({2, 5, 8}); + + DeltaDeletionVectorReader reader; + reader.loadSerializedDeletionVector(payload); + + auto deleteBitmap = AlignedBuffer::allocate(bits::nwords(10), pool_.get()); + reader.applyDeletionFilter(0, 10, deleteBitmap); + + auto* rawBitmap = deleteBitmap->as(); + EXPECT_TRUE(bits::isBitSet(rawBitmap, 2)); + EXPECT_TRUE(bits::isBitSet(rawBitmap, 5)); + EXPECT_TRUE(bits::isBitSet(rawBitmap, 8)); + EXPECT_FALSE(bits::isBitSet(rawBitmap, 1)); + EXPECT_FALSE(bits::isBitSet(rawBitmap, 7)); +} + +TEST_F(DeltaDeletionVectorReaderTest, ApplyDeletionFilterWithOffset) { + auto payload = createSerializedPayload({10, 15, 20}); + + DeltaDeletionVectorReader reader; + reader.loadSerializedDeletionVector(payload); + + auto deleteBitmap = AlignedBuffer::allocate(bits::nwords(15), pool_.get()); + reader.applyDeletionFilter(10, 15, deleteBitmap); + + auto* rawBitmap = deleteBitmap->as(); + EXPECT_TRUE(bits::isBitSet(rawBitmap, 0)); + EXPECT_TRUE(bits::isBitSet(rawBitmap, 5)); + EXPECT_TRUE(bits::isBitSet(rawBitmap, 10)); + EXPECT_FALSE(bits::isBitSet(rawBitmap, 1)); + EXPECT_FALSE(bits::isBitSet(rawBitmap, 14)); +} + +TEST_F(DeltaDeletionVectorReaderTest, EmptyReader) { + DeltaDeletionVectorReader reader; + + EXPECT_TRUE(reader.empty()); + EXPECT_FALSE(reader.isRowDeleted(0)); + + auto deleteBitmap = AlignedBuffer::allocate(bits::nwords(10), pool_.get()); + reader.applyDeletionFilter(0, 10, deleteBitmap); + + auto* rawBitmap = deleteBitmap->as(); + for (int i = 0; i < 10; ++i) { + EXPECT_FALSE(bits::isBitSet(rawBitmap, i)); + } +} + +TEST_F(DeltaDeletionVectorReaderTest, MultipleLoadsOverwrite) { + DeltaDeletionVectorReader reader; + reader.loadSerializedDeletionVector(createSerializedPayload({1, 2, 3})); + EXPECT_TRUE(reader.isRowDeleted(1)); + EXPECT_FALSE(reader.isRowDeleted(10)); + + reader.loadSerializedDeletionVector(createSerializedPayload({10, 20, 30})); + EXPECT_FALSE(reader.isRowDeleted(1)); + EXPECT_TRUE(reader.isRowDeleted(10)); +} + +TEST_F(DeltaDeletionVectorReaderTest, EmptyPayloadThrows) { + DeltaDeletionVectorReader reader; + VELOX_ASSERT_THROW(reader.loadSerializedDeletionVector(std::string_view()), "Serialized deletion vector is empty"); +} + +TEST_F(DeltaDeletionVectorReaderTest, CorruptedMagicNumberThrows) { + const uint32_t wrongMagic = 12345678; + const std::string payload(reinterpret_cast(&wrongMagic), sizeof(wrongMagic)); + + DeltaDeletionVectorReader reader; + VELOX_ASSERT_THROW(reader.loadSerializedDeletionVector(payload), "Unexpected Delta bitmap array magic number"); +} + +TEST_F(DeltaDeletionVectorReaderTest, CardinalityValidationSuccess) { + auto payload = createSerializedPayload({1, 2, 3, 4, 5}); + + DeltaDeletionVectorReader reader; + EXPECT_NO_THROW(reader.loadSerializedDeletionVector(payload, 5)); + EXPECT_EQ(reader.estimatedDeletedRowCount(), 5); +} + +TEST_F(DeltaDeletionVectorReaderTest, CardinalityValidationMismatchThrows) { + auto payload = createSerializedPayload({1, 2, 3, 4, 5}); + + DeltaDeletionVectorReader reader; + EXPECT_THROW(reader.loadSerializedDeletionVector(payload, 3), VeloxUserError); +} + +TEST_F(DeltaDeletionVectorReaderTest, LargeCardinalityValidation) { + std::vector deletedRows; + for (int64_t i = 0; i < 10000; i += 10) { + deletedRows.push_back(i); + } + + DeltaDeletionVectorReader reader; + reader.loadSerializedDeletionVector(createSerializedPayload(deletedRows), 1000); + EXPECT_EQ(reader.estimatedDeletedRowCount(), 1000); +} + +TEST_F(DeltaDeletionVectorReaderTest, BatchFilteringPartialOverlap) { + std::vector deletedRows; + for (int64_t i = 45; i <= 55; ++i) { + deletedRows.push_back(i); + } + + DeltaDeletionVectorReader reader; + reader.loadSerializedDeletionVector(createSerializedPayload(deletedRows)); + + auto deleteBitmap = AlignedBuffer::allocate(bits::nwords(20), pool_.get()); + reader.applyDeletionFilter(40, 20, deleteBitmap); + + auto* rawBitmap = deleteBitmap->as(); + for (int i = 0; i < 5; ++i) { + EXPECT_FALSE(bits::isBitSet(rawBitmap, i)); + } + for (int i = 5; i <= 15; ++i) { + EXPECT_TRUE(bits::isBitSet(rawBitmap, i)); + } + for (int i = 16; i < 20; ++i) { + EXPECT_FALSE(bits::isBitSet(rawBitmap, i)); + } +} diff --git a/cpp/velox/compute/delta/tests/DeltaSplitTest.cpp b/cpp/velox/compute/delta/tests/DeltaSplitTest.cpp new file mode 100644 index 000000000000..6b210ec730d3 --- /dev/null +++ b/cpp/velox/compute/delta/tests/DeltaSplitTest.cpp @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "compute/delta/DeltaSplit.h" + +using namespace gluten::delta; + +TEST(DeltaSplitTest, DescriptorCarriesPayloadView) { + const std::string payload = "payload"; + gluten::SplitPayloadBufferView payloadView{ + reinterpret_cast(payload.data()), static_cast(payload.size())}; + + auto descriptor = DeltaDeletionVectorDescriptor::serialized(3, payloadView); + + ASSERT_TRUE(descriptor.serializedPayloadView.has_value()); + EXPECT_EQ(descriptor.serializedPayloadView->size, payload.size()); + EXPECT_EQ(descriptor.cardinality, 3); + EXPECT_TRUE(descriptor.hasMaterializedPayload()); +} + +TEST(DeltaSplitTest, SplitCarriesDeletionVectorDescriptor) { + const std::string payload = "serialized"; + gluten::SplitPayloadBufferView payloadView{ + reinterpret_cast(payload.data()), static_cast(payload.size())}; + auto descriptor = DeltaDeletionVectorDescriptor::serialized(2, payloadView); + + auto split = std::make_shared( + "test-delta", + "/tmp/data.parquet", + facebook::velox::dwio::common::FileFormat::PARQUET, + 0, + 1024, + std::unordered_map>{}, + std::nullopt, + std::unordered_map{{"table_format", "delta"}}, + nullptr, + std::unordered_map{}, + true, + descriptor, + std::nullopt, + std::nullopt, + DeltaRowIndexFilterType::kIfContained, + std::unordered_map{}, + std::nullopt); + + ASSERT_TRUE(split->deletionVector.has_value()); + EXPECT_EQ(split->deletionVector->cardinality, 2); + ASSERT_TRUE(split->deletionVector->serializedPayloadView.has_value()); + EXPECT_EQ(split->deletionVector->serializedPayloadView->size, payload.size()); + EXPECT_EQ(split->filterType, DeltaRowIndexFilterType::kIfContained); +} + +TEST(DeltaSplitTest, LogicalRowCountSubtractsDeletionVectorCardinality) { + DeltaFileStatistics stats{.numRecords = 10, .tightBounds = true}; + auto descriptor = DeltaDeletionVectorDescriptor::serialized(3); + + EXPECT_EQ(stats.logicalRowCount(descriptor), 7); +} + +TEST(DeltaSplitTest, LogicalRowCountPreservesUnknownCounts) { + DeltaFileStatistics stats{.numRecords = std::nullopt, .tightBounds = std::nullopt}; + auto descriptor = DeltaDeletionVectorDescriptor::serialized(3); + + EXPECT_EQ(stats.logicalRowCount(descriptor), -1); +} From 66ea4609ab24b7d8dead713204d4cc5e8437e279 Mon Sep 17 00:00:00 2001 From: Mohammad Linjawi Date: Mon, 4 May 2026 16:57:06 +0300 Subject: [PATCH 2/3] fix: define split payload view for native Delta reader --- cpp/core/compute/Runtime.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index 9d8315731fc5..4ab944898bd6 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -56,6 +56,11 @@ struct SparkTaskInfo { } }; +struct SplitPayloadBufferView { + const uint8_t* data; + int32_t size; +}; + class Runtime : public std::enable_shared_from_this { public: using Factory = std::function Date: Thu, 7 May 2026 13:20:07 +0300 Subject: [PATCH 3/3] fix: align Delta reader with split IO stats API --- cpp/velox/compute/delta/DeltaDataSource.cpp | 3 ++- cpp/velox/compute/delta/DeltaSplitReader.cpp | 8 +++++++- cpp/velox/compute/delta/DeltaSplitReader.h | 5 +++++ 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/cpp/velox/compute/delta/DeltaDataSource.cpp b/cpp/velox/compute/delta/DeltaDataSource.cpp index e7ec73007b99..f8c00c2b077f 100644 --- a/cpp/velox/compute/delta/DeltaDataSource.cpp +++ b/cpp/velox/compute/delta/DeltaDataSource.cpp @@ -65,7 +65,8 @@ std::unique_ptr DeltaDataSource::createSplitReader() { connectorQueryCtx_, fileConfig_, readerOutputType_, - ioStatistics_, + dataIoStats_, + metadataIoStats_, ioStats_, fileHandleFactory_, ioExecutor_, diff --git a/cpp/velox/compute/delta/DeltaSplitReader.cpp b/cpp/velox/compute/delta/DeltaSplitReader.cpp index 1b34a5cf4280..98766bd77074 100644 --- a/cpp/velox/compute/delta/DeltaSplitReader.cpp +++ b/cpp/velox/compute/delta/DeltaSplitReader.cpp @@ -50,7 +50,12 @@ DeltaSplitReader::DeltaSplitReader( const ConnectorQueryCtx* connectorQueryCtx, const std::shared_ptr& fileConfig, const RowTypePtr& readerOutputType, +#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER + const std::shared_ptr& dataIoStats, + const std::shared_ptr& metadataIoStats, +#else const std::shared_ptr& ioStatistics, +#endif const std::shared_ptr& ioStats, FileHandleFactory* fileHandleFactory, folly::Executor* executor, @@ -68,7 +73,8 @@ DeltaSplitReader::DeltaSplitReader( connectorQueryCtx, fileConfig, readerOutputType, - ioStatistics, + dataIoStats, + metadataIoStats, ioStats, fileHandleFactory, executor, diff --git a/cpp/velox/compute/delta/DeltaSplitReader.h b/cpp/velox/compute/delta/DeltaSplitReader.h index e1c02d06c814..517cf6c67d5e 100644 --- a/cpp/velox/compute/delta/DeltaSplitReader.h +++ b/cpp/velox/compute/delta/DeltaSplitReader.h @@ -79,7 +79,12 @@ class DeltaSplitReader : public DeltaSplitReaderBase { const ConnectorQueryCtx* connectorQueryCtx, const std::shared_ptr& fileConfig, const RowTypePtr& readerOutputType, +#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER + const std::shared_ptr& dataIoStats, + const std::shared_ptr& metadataIoStats, +#else const std::shared_ptr& ioStatistics, +#endif const std::shared_ptr& ioStats, FileHandleFactory* fileHandleFactory, folly::Executor* executor,