Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ set(VELOX_SRCS
utils/VeloxWriterUtils.cc)

if(ENABLE_S3)
list(APPEND VELOX_SRCS filesystem/GlutenS3FileSystem.cc)
find_package(ZLIB)
endif()

Expand Down
8 changes: 5 additions & 3 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@

#include "compute/VeloxRuntime.h"
#include "config/VeloxConfig.h"
#ifdef ENABLE_S3
#include "filesystem/GlutenS3FileSystem.h"
#endif
#include "jni/JniFileSystem.h"
#include "memory/GlutenBufferedInputBuilder.h"
#include "operators/functions/SparkExprToSubfieldFilterParser.h"
Expand All @@ -55,7 +58,6 @@
#include "velox/connectors/hive/storage_adapters/gcs/RegisterGcsFileSystem.h" // @manual
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h" // @manual
#include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h" // @manual
#include "velox/dwio/orc/reader/OrcReader.h"
#include "velox/dwio/parquet/RegisterParquetReader.h"
#include "velox/dwio/parquet/RegisterParquetWriter.h"
Expand Down Expand Up @@ -155,7 +157,7 @@ void VeloxBackend::init(
velox::filesystems::registerHdfsFileSystem();
#endif
#ifdef ENABLE_S3
velox::filesystems::registerS3FileSystem();
registerGlutenS3FileSystem();
#endif
#ifdef ENABLE_GCS
velox::filesystems::registerGcsFileSystem();
Expand Down Expand Up @@ -370,7 +372,7 @@ void VeloxBackend::tearDown() {
}
#endif
#ifdef ENABLE_S3
velox::filesystems::finalizeS3FileSystem();
finalizeGlutenS3FileSystem();
#endif

// Destruct IOThreadPoolExecutor will join all threads.
Expand Down
55 changes: 55 additions & 0 deletions cpp/velox/filesystem/GlutenS3FileSystem.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "filesystem/GlutenS3FileSystem.h"

#include <memory>
#include <utility>

#include "velox/common/file/File.h"

namespace gluten {

namespace velox = facebook::velox;
namespace filesystems = facebook::velox::filesystems;

namespace {

std::shared_ptr<filesystems::FileSystem> glutenS3FileSystemFactory(
std::string_view bucketName,
std::shared_ptr<const velox::config::ConfigBase> config) {
return std::make_shared<GlutenS3FileSystem>(bucketName, config);
}

} // namespace

std::unique_ptr<velox::WriteFile> GlutenS3FileSystem::openFileForWrite(
std::string_view s3Path,
const filesystems::FileOptions& options) {
return filesystems::S3FileSystem::openFileForWrite(s3Path, options);
}

void registerGlutenS3FileSystem(filesystems::CacheKeyFn cacheKeyFunc) {
filesystems::registerS3FileSystem(
std::move(cacheKeyFunc), glutenS3FileSystemFactory);
}

void finalizeGlutenS3FileSystem() {
filesystems::finalizeS3FileSystem();
}

} // namespace gluten
47 changes: 47 additions & 0 deletions cpp/velox/filesystem/GlutenS3FileSystem.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memory>
#include <string_view>

#include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h"

namespace gluten {

namespace velox = facebook::velox;

class GlutenS3FileSystem : public velox::filesystems::S3FileSystem {
public:
GlutenS3FileSystem(
std::string_view bucketName,
const std::shared_ptr<const velox::config::ConfigBase>& config)
: S3FileSystem(bucketName, config) {}

std::unique_ptr<velox::WriteFile> openFileForWrite(
std::string_view s3Path,
const velox::filesystems::FileOptions& options) override;
};

void registerGlutenS3FileSystem(
velox::filesystems::CacheKeyFn cacheKeyFunc = nullptr);

void finalizeGlutenS3FileSystem();

} // namespace gluten
3 changes: 3 additions & 0 deletions cpp/velox/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ add_velox_test(spark_functions_test SOURCES SparkFunctionTest.cc
add_velox_test(runtime_test SOURCES RuntimeTest.cc)
add_velox_test(velox_memory_test SOURCES MemoryManagerTest.cc)
add_velox_test(buffer_outputstream_test SOURCES BufferOutputStreamTest.cc)
if(ENABLE_S3)
add_velox_test(gluten_s3_file_system_test SOURCES GlutenS3FileSystemTest.cc)
endif()
if(BUILD_EXAMPLES)
add_velox_test(my_udf_test SOURCES MyUdfTest.cc)
endif()
Expand Down
71 changes: 71 additions & 0 deletions cpp/velox/tests/GlutenS3FileSystemTest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "filesystem/GlutenS3FileSystem.h"

#include <gtest/gtest.h>

#include <memory>
#include <string>
#include <unordered_map>

#include "velox/common/config/Config.h"
#include "velox/common/file/FileSystems.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Config.h"

namespace gluten {
namespace {

namespace velox = facebook::velox;
namespace filesystems = facebook::velox::filesystems;

TEST(GlutenS3FileSystemTest, registeredFileSystemUsesGlutenSubclass) {
registerGlutenS3FileSystem();

auto config = std::make_shared<velox::config::ConfigBase>(
std::unordered_map<std::string, std::string>{
{filesystems::S3Config::baseConfigKey(
filesystems::S3Config::Keys::kEndpoint),
"http://127.0.0.1:9000"},
{filesystems::S3Config::baseConfigKey(
filesystems::S3Config::Keys::kAccessKey),
"access"},
{filesystems::S3Config::baseConfigKey(
filesystems::S3Config::Keys::kSecretKey),
"secret"},
{filesystems::S3Config::baseConfigKey(
filesystems::S3Config::Keys::kSSLEnabled),
"false"},
{filesystems::S3Config::baseConfigKey(
filesystems::S3Config::Keys::kPathStyleAccess),
"true"},
{filesystems::S3Config::baseConfigKey(
filesystems::S3Config::Keys::kIMDSEnabled),
"false"}});

auto fileSystem =
filesystems::getFileSystem("s3://gluten-test-bucket/test", config);

EXPECT_NE(dynamic_cast<GlutenS3FileSystem*>(fileSystem.get()), nullptr);
EXPECT_EQ(fileSystem->name(), "S3");

fileSystem.reset();
finalizeGlutenS3FileSystem();
}

} // namespace
} // namespace gluten
Loading