Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cpp/core/compute/Runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ struct SparkTaskInfo {
}
};

struct SplitPayloadBufferView {
const uint8_t* data;
int32_t size;
};

class Runtime : public std::enable_shared_from_this<Runtime> {
public:
using Factory = std::function<Runtime*(
Expand Down
7 changes: 6 additions & 1 deletion cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ set(VELOX_SRCS
compute/VeloxRuntime.cc
compute/VeloxPlanConverter.cc
compute/WholeStageResultIterator.cc
compute/delta/DeltaConnector.cpp
compute/delta/DeltaDataSource.cpp
compute/delta/DeltaDeletionVectorReader.cpp
compute/delta/DeltaSplit.cpp
compute/delta/DeltaSplitReader.cpp
compute/delta/RoaringBitmapArray.cpp
compute/iceberg/IcebergPlanConverter.cc
jni/JniFileSystem.cc
Expand Down Expand Up @@ -390,8 +395,8 @@ find_package(
target_link_libraries(velox PUBLIC ICU::i18n ICU::uc ICU::data)

if(BUILD_TESTS)
add_subdirectory(tests)
add_subdirectory(compute/delta/tests)
add_subdirectory(tests)
endif()

if(BUILD_BENCHMARKS)
Expand Down
7 changes: 7 additions & 0 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/task_queue/UnboundedBlockingQueue.h>

#include "compute/delta/DeltaConnector.h"
#include "operators/functions/RegistrationAllFunctions.h"
#include "operators/plannodes/RowVectorStream.h"
#include "utils/ConfigExtractor.h"
Expand Down Expand Up @@ -323,6 +324,12 @@ std::shared_ptr<facebook::velox::connector::Connector> VeloxBackend::createHiveC
return std::make_shared<velox::connector::hive::HiveConnector>(connectorId, hiveConnectorConfig_, ioExecutor);
}

std::shared_ptr<facebook::velox::connector::Connector> VeloxBackend::createDeltaConnector(
const std::string& connectorId,
folly::Executor* ioExecutor) const {
return std::make_shared<delta::DeltaConnector>(connectorId, hiveConnectorConfig_, ioExecutor);
}

std::shared_ptr<facebook::velox::connector::Connector> VeloxBackend::createValueStreamConnector(
const std::string& connectorId,
bool dynamicFilterEnabled) const {
Expand Down
4 changes: 4 additions & 0 deletions cpp/velox/compute/VeloxBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ class VeloxBackend {
const std::string& connectorId,
folly::Executor* ioExecutor) const;

std::shared_ptr<facebook::velox::connector::Connector> createDeltaConnector(
const std::string& connectorId,
folly::Executor* ioExecutor) const;

std::shared_ptr<facebook::velox::connector::Connector> createValueStreamConnector(
const std::string& connectorId,
bool dynamicFilterEnabled) const;
Expand Down
2 changes: 2 additions & 0 deletions cpp/velox/compute/VeloxConnectorIds.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ namespace gluten {

struct VeloxConnectorIds {
std::string hive;
std::string delta;
std::string iterator;
std::string cudfHive;
bool hiveRegistered{false};
bool deltaRegistered{false};
bool iteratorRegistered{false};
bool cudfHiveRegistered{false};
};
Expand Down
13 changes: 13 additions & 0 deletions cpp/velox/compute/VeloxRuntime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "compute/ResultIterator.h"
#include "compute/Runtime.h"
#include "compute/VeloxPlanConverter.h"
#include "compute/delta/DeltaConnector.h"
#include "config/VeloxConfig.h"
#include "operators/plannodes/IteratorSplit.h"
#include "operators/serializer/VeloxRowToColumnarConverter.h"
Expand Down Expand Up @@ -213,6 +214,7 @@ std::string makeScopedConnectorId(const std::string& base, uint64_t runtimeId) {
VeloxConnectorIds makeScopedConnectorIds(uint64_t runtimeId) {
return VeloxConnectorIds{
.hive = makeScopedConnectorId(kHiveConnectorId, runtimeId),
.delta = makeScopedConnectorId(delta::DeltaConnectorFactory::kDeltaConnectorName, runtimeId),
.iterator = makeScopedConnectorId(kIteratorConnectorId, runtimeId),
.cudfHive = makeScopedConnectorId(kCudfHiveConnectorId, runtimeId)};
}
Expand Down Expand Up @@ -271,6 +273,13 @@ void VeloxRuntime::registerConnectors() {
velox::connector::hasConnector(connectorIds_.hive),
"Scoped hive connector not found after registration: " + connectorIds_.hive);

connectorIds_.deltaRegistered =
velox::connector::registerConnector(backend->createDeltaConnector(connectorIds_.delta, ioExecutor_.get()));
GLUTEN_CHECK(connectorIds_.deltaRegistered, "Failed to register scoped delta connector: " + connectorIds_.delta);
GLUTEN_CHECK(
velox::connector::hasConnector(connectorIds_.delta),
"Scoped delta connector not found after registration: " + connectorIds_.delta);

const auto valueStreamDynamicFilterEnabled =
veloxCfg_->get<bool>(kValueStreamDynamicFilterEnabled, kValueStreamDynamicFilterEnabledDefault);
connectorIds_.iteratorRegistered = velox::connector::registerConnector(
Expand Down Expand Up @@ -306,6 +315,10 @@ void VeloxRuntime::unregisterConnectors() {
velox::connector::unregisterConnector(connectorIds_.iterator);
connectorIds_.iteratorRegistered = false;
}
if (connectorIds_.deltaRegistered) {
velox::connector::unregisterConnector(connectorIds_.delta);
connectorIds_.deltaRegistered = false;
}
if (connectorIds_.hiveRegistered) {
velox::connector::unregisterConnector(connectorIds_.hive);
connectorIds_.hiveRegistered = false;
Expand Down
1 change: 1 addition & 0 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ std::shared_ptr<velox::core::QueryCtx> WholeStageResultIterator::createNewVeloxQ
std::unordered_map<std::string, std::shared_ptr<velox::config::ConfigBase>> connectorConfigs;
auto hiveSessionConfig = createHiveConnectorSessionConfig(veloxCfg_);
connectorConfigs[connectorIds_.hive] = hiveSessionConfig;
connectorConfigs[connectorIds_.delta] = hiveSessionConfig;
connectorConfigs[connectorIds_.iterator] = hiveSessionConfig;
#ifdef GLUTEN_ENABLE_GPU
if (!connectorIds_.cudfHive.empty()) {
Expand Down
48 changes: 48 additions & 0 deletions cpp/velox/compute/delta/DeltaConnector.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "compute/delta/DeltaConnector.h"

#include "compute/delta/DeltaDataSource.h"

namespace gluten::delta {

std::unique_ptr<DataSource> DeltaConnector::createDataSource(
const RowTypePtr& outputType,
const ConnectorTableHandlePtr& tableHandle,
const ColumnHandleMap& columnHandles,
ConnectorQueryCtx* connectorQueryCtx) {
return std::make_unique<DeltaDataSource>(
outputType, tableHandle, columnHandles, &fileHandleFactory_, ioExecutor_, connectorQueryCtx, hiveConfig_);
}

} // namespace gluten::delta
70 changes: 70 additions & 0 deletions cpp/velox/compute/delta/DeltaConnector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/connectors/hive/HiveConnector.h"

namespace gluten::delta {

using namespace facebook::velox;
using namespace facebook::velox::connector;
using namespace facebook::velox::connector::hive;

class DeltaConnector final : public HiveConnector {
public:
DeltaConnector(const std::string& id, std::shared_ptr<const config::ConfigBase> config, folly::Executor* ioExecutor)
: HiveConnector(id, std::move(config), ioExecutor) {}

std::unique_ptr<DataSource> createDataSource(
const RowTypePtr& outputType,
const ConnectorTableHandlePtr& tableHandle,
const ColumnHandleMap& columnHandles,
ConnectorQueryCtx* connectorQueryCtx) override;
};

class DeltaConnectorFactory final : public ConnectorFactory {
public:
static constexpr const char* kDeltaConnectorName = "delta";

DeltaConnectorFactory() : ConnectorFactory(kDeltaConnectorName) {}

std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* ioExecutor = nullptr,
[[maybe_unused]] folly::Executor* cpuExecutor = nullptr) override {
return std::make_shared<DeltaConnector>(id, std::move(config), ioExecutor);
}
};

} // namespace gluten::delta
98 changes: 98 additions & 0 deletions cpp/velox/compute/delta/DeltaDataSource.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "compute/delta/DeltaDataSource.h"

#include "compute/delta/DeltaSplitReader.h"

namespace gluten::delta {

DeltaDataSource::DeltaDataSource(
const RowTypePtr& outputType,
const ConnectorTableHandlePtr& tableHandle,
const ColumnHandleMap& assignments,
FileHandleFactory* fileHandleFactory,
folly::Executor* ioExecutor,
const ConnectorQueryCtx* connectorQueryCtx,
const std::shared_ptr<HiveConfig>& hiveConfig)
: HiveDataSource(
outputType,
tableHandle,
assignments,
fileHandleFactory,
ioExecutor,
connectorQueryCtx,
hiveConfig) {}

#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
std::unique_ptr<FileSplitReader> DeltaDataSource::createSplitReader() {
auto bucketChannels = prepareSplit();
auto deltaSplit = checkedPointerCast<const HiveDeltaSplit>(split_);

return std::make_unique<DeltaSplitReader>(
deltaSplit,
tableHandle_,
&partitionKeys_,
connectorQueryCtx_,
fileConfig_,
readerOutputType_,
dataIoStats_,
metadataIoStats_,
ioStats_,
fileHandleFactory_,
ioExecutor_,
scanSpec_,
&infoColumns_,
std::move(bucketChannels),
/*subfieldFiltersForValidation=*/getFilters());
}
#else
std::unique_ptr<SplitReader> DeltaDataSource::createSplitReader() {
auto deltaSplit = checkedPointerCast<const HiveDeltaSplit>(split_);

return std::make_unique<DeltaSplitReader>(
deltaSplit,
hiveTableHandle_,
&partitionKeys_,
connectorQueryCtx_,
hiveConfig_,
readerOutputType_,
ioStatistics_,
ioStats_,
fileHandleFactory_,
ioExecutor_,
scanSpec_,
/*subfieldFiltersForValidation=*/getFilters());
}
#endif

} // namespace gluten::delta
Loading
Loading