From d8aec3010d118c11d17eca32bdd9e8c417043cf9 Mon Sep 17 00:00:00 2001 From: shaoting-huang Date: Tue, 11 Feb 2025 15:47:19 +0800 Subject: [PATCH] explicitly specify column groups for storage v2 api Signed-off-by: shaoting-huang --- internal/core/src/common/type_c.h | 1 + internal/core/src/segcore/arrow_fs_c.cpp | 74 ++++++++++++++++ internal/core/src/segcore/arrow_fs_c.h | 40 +++++++++ internal/core/src/segcore/column_groups_c.cpp | 47 +++++++++++ internal/core/src/segcore/column_groups_c.h | 35 ++++++++ internal/core/src/segcore/packed_reader_c.cpp | 44 +++++----- internal/core/src/segcore/packed_reader_c.h | 10 ++- internal/core/src/segcore/packed_writer_c.cpp | 51 +++++++---- internal/core/src/segcore/packed_writer_c.h | 15 ++-- .../thirdparty/milvus-storage/CMakeLists.txt | 2 +- internal/core/unittest/CMakeLists.txt | 3 + internal/core/unittest/test_arrow_fs_c.cpp | 43 ++++++++++ .../core/unittest/test_column_groups_c.cpp | 47 +++++++++++ internal/core/unittest/test_packed_c.cpp | 84 +++++++++++++++++++ internal/storage/serde_events.go | 1 - internal/storage/serde_events_v2.go | 34 ++++---- internal/storage/serde_events_v2_test.go | 20 +++-- internal/storagev2/packed/packed_reader.go | 27 +++--- internal/storagev2/packed/packed_test.go | 33 +++++--- internal/storagev2/packed/packed_writer.go | 45 +++++++--- internal/storagev2/packed/util.go | 43 ++++++++++ internal/util/initcore/init_core.go | 64 ++++++++++++++ internal/util/initcore/init_core_test.go | 29 +++++++ 23 files changed, 686 insertions(+), 106 deletions(-) create mode 100644 internal/core/src/segcore/arrow_fs_c.cpp create mode 100644 internal/core/src/segcore/arrow_fs_c.h create mode 100644 internal/core/src/segcore/column_groups_c.cpp create mode 100644 internal/core/src/segcore/column_groups_c.h create mode 100644 internal/core/unittest/test_arrow_fs_c.cpp create mode 100644 internal/core/unittest/test_column_groups_c.cpp create mode 100644 internal/core/unittest/test_packed_c.cpp create mode 100644 internal/storagev2/packed/util.go diff --git a/internal/core/src/common/type_c.h b/internal/core/src/common/type_c.h index e6c6c8e8f6811..f7733cdd21db6 100644 --- a/internal/core/src/common/type_c.h +++ b/internal/core/src/common/type_c.h @@ -95,6 +95,7 @@ typedef struct CStorageConfig { bool useVirtualHost; int64_t requestTimeoutMs; const char* gcp_credential_json; + bool use_custom_part_upload; } CStorageConfig; typedef struct CMmapConfig { diff --git a/internal/core/src/segcore/arrow_fs_c.cpp b/internal/core/src/segcore/arrow_fs_c.cpp new file mode 100644 index 0000000000000..20ddb1b5f2339 --- /dev/null +++ b/internal/core/src/segcore/arrow_fs_c.cpp @@ -0,0 +1,74 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "segcore/arrow_fs_c.h" +#include "milvus-storage/filesystem/fs.h" +#include "common/EasyAssert.h" +#include "common/type_c.h" + +CStatus +InitLocalArrowFileSystemSingleton(const char* c_path) { + try { + std::string path(c_path); + milvus_storage::ArrowFileSystemConfig conf; + conf.root_path = path; + conf.storage_type = "local"; + milvus_storage::ArrowFileSystemSingleton::GetInstance().Init(conf); + + return milvus::SuccessCStatus(); + } catch (std::exception& e) { + return milvus::FailureCStatus(&e); + } +} + +void +CleanLocalArrowFileSystemSingleton() { + milvus_storage::ArrowFileSystemSingleton::GetInstance().Release(); +} + +CStatus +InitRemoteArrowFileSystemSingleton(CStorageConfig c_storage_config) { + try { + milvus_storage::ArrowFileSystemConfig conf; + conf.address = std::string(c_storage_config.address); + conf.bucket_name = std::string(c_storage_config.bucket_name); + conf.access_key_id = std::string(c_storage_config.access_key_id); + conf.access_key_value = std::string(c_storage_config.access_key_value); + conf.root_path = std::string(c_storage_config.root_path); + conf.storage_type = std::string(c_storage_config.storage_type); + conf.cloud_provider = std::string(c_storage_config.cloud_provider); + conf.iam_endpoint = std::string(c_storage_config.iam_endpoint); + conf.log_level = std::string(c_storage_config.log_level); + conf.region = std::string(c_storage_config.region); + conf.useSSL = c_storage_config.useSSL; + conf.sslCACert = std::string(c_storage_config.sslCACert); + conf.useIAM = c_storage_config.useIAM; + conf.useVirtualHost = c_storage_config.useVirtualHost; + conf.requestTimeoutMs = c_storage_config.requestTimeoutMs; + conf.gcp_credential_json = std::string(c_storage_config.gcp_credential_json); + conf.use_custom_part_upload = c_storage_config.use_custom_part_upload; + milvus_storage::ArrowFileSystemSingleton::GetInstance().Init(conf); + + return milvus::SuccessCStatus(); + } catch (std::exception& e) { + return milvus::FailureCStatus(&e); + } +} + +void +CleanRemoteArrowFileSystemSingleton() { + milvus_storage::ArrowFileSystemSingleton::GetInstance().Release(); +} diff --git a/internal/core/src/segcore/arrow_fs_c.h b/internal/core/src/segcore/arrow_fs_c.h new file mode 100644 index 0000000000000..61ad419c92072 --- /dev/null +++ b/internal/core/src/segcore/arrow_fs_c.h @@ -0,0 +1,40 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "common/type_c.h" + +#ifdef __cplusplus +extern "C" { +#endif + +CStatus +InitLocalArrowFileSystemSingleton(const char* c_path); + +void +CleanLocalArrowFileSystemSingleton(); + +CStatus +InitRemoteArrowFileSystemSingleton(CStorageConfig c_storage_config); + +void +CleanRemoteArrowFileSystemSingleton(); + + +#ifdef __cplusplus +} +#endif diff --git a/internal/core/src/segcore/column_groups_c.cpp b/internal/core/src/segcore/column_groups_c.cpp new file mode 100644 index 0000000000000..7d7c3c9452c8f --- /dev/null +++ b/internal/core/src/segcore/column_groups_c.cpp @@ -0,0 +1,47 @@ +// Copyright 2025 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "segcore/column_groups_c.h" +#include +#include +#include + +using VecVecInt = std::vector>; + +extern "C" { + +CColumnGroups NewCColumnGroups() { + auto vv = std::make_unique(); + return vv.release(); +} + +void AddCColumnGroup(CColumnGroups cgs, int* group, int group_size) { + if (!cgs || !group) + return; + + auto vv = static_cast(cgs); + std::vector new_group(group, group + group_size); + vv->emplace_back(std::move(new_group)); +} + +int CColumnGroupsSize(CColumnGroups cgs) { + if (!cgs) + return 0; + + auto vv = static_cast(cgs); + return static_cast(vv->size()); +} + +void FreeCColumnGroups(CColumnGroups cgs) { delete static_cast(cgs); } +} \ No newline at end of file diff --git a/internal/core/src/segcore/column_groups_c.h b/internal/core/src/segcore/column_groups_c.h new file mode 100644 index 0000000000000..fca407d417d5a --- /dev/null +++ b/internal/core/src/segcore/column_groups_c.h @@ -0,0 +1,35 @@ +// Copyright 2025 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void* CColumnGroups; + +CColumnGroups NewCColumnGroups(); + +void AddCColumnGroup(CColumnGroups cgs, int* group, int group_size); + +int CColumnGroupsSize(CColumnGroups cgs); + +void FreeCColumnGroups(CColumnGroups cgs); + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/internal/core/src/segcore/packed_reader_c.cpp b/internal/core/src/segcore/packed_reader_c.cpp index 56aaf0e1977c5..faf9c466ffc88 100644 --- a/internal/core/src/segcore/packed_reader_c.cpp +++ b/internal/core/src/segcore/packed_reader_c.cpp @@ -22,33 +22,37 @@ #include #include #include +#include "common/EasyAssert.h" +#include "common/type_c.h" -int -NewPackedReader(const char* path, + +CStatus +NewPackedReader(char** paths, + int64_t num_paths, struct ArrowSchema* schema, const int64_t buffer_size, CPackedReader* c_packed_reader) { try { - auto truePath = std::string(path); - auto factory = std::make_shared(); - auto conf = milvus_storage::StorageConfig(); - conf.uri = "file:///tmp/"; - auto trueFs = factory->BuildFileSystem(conf, &truePath).value(); + auto truePaths = std::vector(paths, paths + num_paths); + auto trueFs = milvus_storage::ArrowFileSystemSingleton::GetInstance().GetArrowFileSystem(); + if (!trueFs) { + return milvus::FailureCStatus(milvus::ErrorCode::FileReadFailed, "Failed to get filesystem"); + } auto trueSchema = arrow::ImportSchema(schema).ValueOrDie(); std::set needed_columns; for (int i = 0; i < trueSchema->num_fields(); i++) { needed_columns.emplace(i); } auto reader = std::make_unique( - *trueFs, path, trueSchema, needed_columns, buffer_size); + trueFs, truePaths, trueSchema, needed_columns, buffer_size); *c_packed_reader = reader.release(); - return 0; + return milvus::SuccessCStatus(); } catch (std::exception& e) { - return -1; + return milvus::FailureCStatus(&e); } } -int +CStatus ReadNext(CPackedReader c_packed_reader, CArrowArray* out_array, CArrowSchema* out_schema) { @@ -59,11 +63,11 @@ ReadNext(CPackedReader c_packed_reader, std::shared_ptr record_batch; auto status = packed_reader->ReadNext(&record_batch); if (!status.ok()) { - return -1; + return milvus::FailureCStatus(milvus::ErrorCode::FileReadFailed, status.ToString()); } if (record_batch == nullptr) { // end of file - return 0; + return milvus::SuccessCStatus(); } else { std::unique_ptr arr = std::make_unique(); std::unique_ptr schema = @@ -71,27 +75,27 @@ ReadNext(CPackedReader c_packed_reader, auto status = arrow::ExportRecordBatch( *record_batch, arr.get(), schema.get()); if (!status.ok()) { - return -1; + return milvus::FailureCStatus(milvus::ErrorCode::FileReadFailed, status.ToString()); } *out_array = arr.release(); *out_schema = schema.release(); - return 0; + return milvus::SuccessCStatus(); } - return 0; + return milvus::SuccessCStatus(); } catch (std::exception& e) { - return -1; + return milvus::FailureCStatus(&e); } } -int +CStatus CloseReader(CPackedReader c_packed_reader) { try { auto packed_reader = static_cast( c_packed_reader); delete packed_reader; - return 0; + return milvus::SuccessCStatus(); } catch (std::exception& e) { - return -1; + return milvus::FailureCStatus(&e); } } \ No newline at end of file diff --git a/internal/core/src/segcore/packed_reader_c.h b/internal/core/src/segcore/packed_reader_c.h index 7a5c90cf16e3c..bad802303617f 100644 --- a/internal/core/src/segcore/packed_reader_c.h +++ b/internal/core/src/segcore/packed_reader_c.h @@ -18,6 +18,7 @@ extern "C" { #endif +#include "common/type_c.h" #include typedef void* CPackedReader; @@ -32,8 +33,9 @@ typedef void* CArrowSchema; * @param buffer_size The max buffer size of the packed reader. * @param c_packed_reader The output pointer of the packed reader. */ -int -NewPackedReader(const char* path, +CStatus +NewPackedReader(char** paths, + int64_t num_paths, struct ArrowSchema* schema, const int64_t buffer_size, CPackedReader* c_packed_reader); @@ -46,7 +48,7 @@ NewPackedReader(const char* path, * @param out_array The output pointer of the arrow array. * @param out_schema The output pointer of the arrow schema. */ -int +CStatus ReadNext(CPackedReader c_packed_reader, CArrowArray* out_array, CArrowSchema* out_schema); @@ -56,7 +58,7 @@ ReadNext(CPackedReader c_packed_reader, * * @param c_packed_reader The packed reader to close. */ -int +CStatus CloseReader(CPackedReader c_packed_reader); #ifdef __cplusplus diff --git a/internal/core/src/segcore/packed_writer_c.cpp b/internal/core/src/segcore/packed_writer_c.cpp index 613e21d78013a..2b86602024690 100644 --- a/internal/core/src/segcore/packed_writer_c.cpp +++ b/internal/core/src/segcore/packed_writer_c.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "segcore/column_groups_c.h" #include "segcore/packed_writer_c.h" #include "milvus-storage/packed/writer.h" #include "milvus-storage/common/log.h" @@ -20,30 +21,44 @@ #include #include +#include "common/EasyAssert.h" +#include "common/type_c.h" -int -NewPackedWriter(const char* path, - struct ArrowSchema* schema, +CStatus +NewPackedWriter(struct ArrowSchema* schema, const int64_t buffer_size, + char** paths, + int64_t num_paths, + int64_t part_upload_size, + CColumnGroups column_groups, CPackedWriter* c_packed_writer) { try { - auto truePath = std::string(path); - auto factory = std::make_shared(); + auto truePaths = std::vector(paths, paths + num_paths); + auto conf = milvus_storage::StorageConfig(); - conf.uri = "file:///tmp/"; - auto trueFs = factory->BuildFileSystem(conf, &truePath).value(); + conf.part_size = part_upload_size; + + auto trueFs = milvus_storage::ArrowFileSystemSingleton::GetInstance() + .GetArrowFileSystem(); + if (!trueFs) { + return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed, "Failed to get filesystem"); + } + auto trueSchema = arrow::ImportSchema(schema).ValueOrDie(); + + auto columnGroups = *static_cast>*>(column_groups); + auto writer = std::make_unique( - buffer_size, trueSchema, trueFs, truePath, conf); + trueFs, truePaths, trueSchema, conf, columnGroups, buffer_size); *c_packed_writer = writer.release(); - return 0; + return milvus::SuccessCStatus(); } catch (std::exception& e) { - return -1; + return milvus::FailureCStatus(&e); } } -int +CStatus WriteRecordBatch(CPackedWriter c_packed_writer, struct ArrowArray* array, struct ArrowSchema* schema) { @@ -55,15 +70,15 @@ WriteRecordBatch(CPackedWriter c_packed_writer, arrow::ImportRecordBatch(array, schema).ValueOrDie(); auto status = packed_writer->Write(record_batch); if (!status.ok()) { - return -1; + return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed, status.ToString()); } - return 0; + return milvus::SuccessCStatus(); } catch (std::exception& e) { - return -1; + return milvus::FailureCStatus(&e); } } -int +CStatus CloseWriter(CPackedWriter c_packed_writer) { try { auto packed_writer = @@ -72,10 +87,10 @@ CloseWriter(CPackedWriter c_packed_writer) { auto status = packed_writer->Close(); delete packed_writer; if (!status.ok()) { - return -1; + return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed, status.ToString()); } - return 0; + return milvus::SuccessCStatus(); } catch (std::exception& e) { - return -1; + return milvus::FailureCStatus(&e); } } \ No newline at end of file diff --git a/internal/core/src/segcore/packed_writer_c.h b/internal/core/src/segcore/packed_writer_c.h index 207aba502d468..5f59a91002fda 100644 --- a/internal/core/src/segcore/packed_writer_c.h +++ b/internal/core/src/segcore/packed_writer_c.h @@ -18,22 +18,27 @@ extern "C" { #endif +#include "common/type_c.h" #include +#include "segcore/column_groups_c.h" typedef void* CPackedWriter; -int -NewPackedWriter(const char* path, - struct ArrowSchema* schema, +CStatus +NewPackedWriter(struct ArrowSchema* schema, const int64_t buffer_size, + char** paths, + int64_t num_paths, + int64_t part_upload_size, + CColumnGroups column_groups, CPackedWriter* c_packed_writer); -int +CStatus WriteRecordBatch(CPackedWriter c_packed_writer, struct ArrowArray* array, struct ArrowSchema* schema); -int +CStatus CloseWriter(CPackedWriter c_packed_writer); #ifdef __cplusplus diff --git a/internal/core/thirdparty/milvus-storage/CMakeLists.txt b/internal/core/thirdparty/milvus-storage/CMakeLists.txt index a847c41a47597..399e2796a96b3 100644 --- a/internal/core/thirdparty/milvus-storage/CMakeLists.txt +++ b/internal/core/thirdparty/milvus-storage/CMakeLists.txt @@ -14,7 +14,7 @@ # Update milvus-storage_VERSION for the first occurrence milvus_add_pkg_config("milvus-storage") set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "") -set( milvus-storage_VERSION 7475494 ) +set( milvus-storage_VERSION 26992ec ) set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git") message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}") message(STATUS "milvus-storage version: ${milvus-storage_VERSION}") diff --git a/internal/core/unittest/CMakeLists.txt b/internal/core/unittest/CMakeLists.txt index 95bc9c983358c..b513fc7f99eb0 100644 --- a/internal/core/unittest/CMakeLists.txt +++ b/internal/core/unittest/CMakeLists.txt @@ -24,6 +24,9 @@ add_definitions(-DMILVUS_TEST_SEGCORE_YAML_PATH="${CMAKE_SOURCE_DIR}/unittest/te # TODO: better to use ls/find pattern set(MILVUS_TEST_FILES init_gtest.cpp + test_packed_c.cpp + test_arrow_fs_c.cpp + test_column_groups_c.cpp test_always_true_expr.cpp test_array_bitmap_index.cpp test_array_inverted_index.cpp diff --git a/internal/core/unittest/test_arrow_fs_c.cpp b/internal/core/unittest/test_arrow_fs_c.cpp new file mode 100644 index 0000000000000..0452916ab29dd --- /dev/null +++ b/internal/core/unittest/test_arrow_fs_c.cpp @@ -0,0 +1,43 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include "segcore/arrow_fs_c.h" + +TEST(ArrowFileSystemSingleton, LocalArrowFileSystemSingleton) { + const char* path = "/tmp"; + CStatus status = InitLocalArrowFileSystemSingleton(path); + EXPECT_EQ(status.error_code, 0); + + CleanLocalArrowFileSystemSingleton(); +} + +TEST(ArrowFileSystemSingleton, RemoteArrowFileSystem) { + CStorageConfig config; + config.address = "https://test-oss-0815.oss-cn-hangzhou.aliyuncs.com"; + config.storage_type = "remote"; + config.bucket_name = "test-bucket"; + config.access_key_id = ""; + config.access_key_value = ""; + config.cloud_provider = "aliyun"; + config.region = "oss-cn-hangzhou"; + config.use_custom_part_upload = true; + + CStatus status = InitRemoteArrowFileSystemSingleton(config); + EXPECT_EQ(status.error_code, 0); + + CleanRemoteArrowFileSystemSingleton(); +} diff --git a/internal/core/unittest/test_column_groups_c.cpp b/internal/core/unittest/test_column_groups_c.cpp new file mode 100644 index 0000000000000..d32852bbe27d1 --- /dev/null +++ b/internal/core/unittest/test_column_groups_c.cpp @@ -0,0 +1,47 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include "segcore/column_groups_c.h" + + +TEST(CColumnGroups, TestCColumnGroups) { + CColumnGroups cgs = NewCColumnGroups(); + int group1[] = {2, 4, 5}; + int group2[] = {0, 1}; + int group3[] = {3, 6, 7, 8}; + + int* test_groups[] = {group1, group2, group3}; + int group_sizes[] = {3, 2, 4}; + + for (int i = 0; i < 3; i++) { + AddCColumnGroup(cgs, test_groups[i], group_sizes[i]); + } + + ASSERT_EQ(CColumnGroupsSize(cgs), 3); + auto vv = static_cast>*>(cgs); + + for (int i = 0; i < 3; i++) { + ASSERT_EQ(vv->at(i).size(), group_sizes[i]); + for (int j = 0; j < group_sizes[i]; j++) { + EXPECT_EQ(vv->at(i)[j], test_groups[i][j]); + } + } + + FreeCColumnGroups(cgs); +} \ No newline at end of file diff --git a/internal/core/unittest/test_packed_c.cpp b/internal/core/unittest/test_packed_c.cpp new file mode 100644 index 0000000000000..93217ddc58e48 --- /dev/null +++ b/internal/core/unittest/test_packed_c.cpp @@ -0,0 +1,84 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include "segcore/packed_writer_c.h" +#include "segcore/packed_reader_c.h" +#include "segcore/arrow_fs_c.h" +#include +#include +#include +#include +#include +#include "arrow/table_builder.h" +#include "arrow/type_fwd.h" +#include +#include + +TEST(CPackedTest, PackedWriterAndReader) { + std::vector test_data(5); + std::iota(test_data.begin(), test_data.end(), 0); + + auto builder = std::make_shared(); + auto status = builder->AppendValues(test_data.begin(), test_data.end()); + ASSERT_TRUE(status.ok()); + auto res = builder->Finish(); + ASSERT_TRUE(res.ok()); + std::shared_ptr array = res.ValueOrDie(); + + auto schema = arrow::schema({arrow::field("int64", arrow::int64())}); + auto batch = arrow::RecordBatch::Make(schema, array->length(), {array}); + + struct ArrowSchema c_write_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_write_schema).ok()); + + const int64_t buffer_size = 10 * 1024 * 1024; + char* path = const_cast("/tmp"); + char* paths[] = {const_cast("/tmp/0")}; + int64_t part_upload_size = 0; + + CColumnGroups cgs = NewCColumnGroups(); + int group[] = {0}; + AddCColumnGroup(cgs, group, 1); + + auto c_status = InitLocalArrowFileSystemSingleton(path); + EXPECT_EQ(c_status.error_code, 0); + CPackedWriter c_packed_writer = nullptr; + c_status = NewPackedWriter(&c_write_schema, buffer_size, paths, 1, part_upload_size, cgs, &c_packed_writer); + EXPECT_EQ(c_status.error_code, 0); + EXPECT_NE(c_packed_writer, nullptr); + + struct ArrowArray carray; + struct ArrowSchema cschema; + ASSERT_TRUE(arrow::ExportRecordBatch(*batch, &carray, &cschema).ok()); + + c_status = WriteRecordBatch(c_packed_writer, &carray, &cschema); + EXPECT_EQ(c_status.error_code, 0); + + c_status = CloseWriter(c_packed_writer); + EXPECT_EQ(c_status.error_code, 0); + + struct ArrowSchema c_read_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_read_schema).ok()); + CPackedReader c_packed_reader = nullptr; + c_status = NewPackedReader(paths, 1, &c_read_schema, buffer_size, &c_packed_reader); + EXPECT_EQ(c_status.error_code, 0); + EXPECT_NE(c_packed_reader, nullptr); + + c_status = CloseReader(c_packed_reader); + EXPECT_EQ(c_status.error_code, 0); + FreeCColumnGroups(cgs); +} \ No newline at end of file diff --git a/internal/storage/serde_events.go b/internal/storage/serde_events.go index 9373843879d65..8ed72accb9c94 100644 --- a/internal/storage/serde_events.go +++ b/internal/storage/serde_events.go @@ -424,7 +424,6 @@ func ValueSerializer(v []*Value, fieldSchema []*schemapb.FieldSchema) (Record, e fields[i] = arrow.Field{ Name: field.Name, Type: arrays[i].DataType(), - Nullable: true, // No nullable check here. Metadata: arrow.NewMetadata([]string{"FieldID"}, []string{strconv.Itoa(int(field.FieldID))}), } field2Col[field.FieldID] = i diff --git a/internal/storage/serde_events_v2.go b/internal/storage/serde_events_v2.go index abb068e540e77..cafd13f12e62a 100644 --- a/internal/storage/serde_events_v2.go +++ b/internal/storage/serde_events_v2.go @@ -31,8 +31,7 @@ import ( type packedRecordReader struct { reader *packed.PackedReader - bufferSize int - path string + bufferSize int64 schema *schemapb.CollectionSchema r *simpleArrowRecord field2Col map[FieldID]int @@ -61,13 +60,13 @@ func (pr *packedRecordReader) Close() error { return nil } -func NewPackedRecordReader(path string, schema *schemapb.CollectionSchema, bufferSize int, +func NewPackedRecordReader(paths []string, schema *schemapb.CollectionSchema, bufferSize int64, ) (*packedRecordReader, error) { arrowSchema, err := ConvertToArrowSchema(schema.Fields) if err != nil { return nil, merr.WrapErrParameterInvalid("convert collection schema [%s] to arrow schema error: %s", schema.Name, err.Error()) } - reader, err := packed.NewPackedReader(path, arrowSchema, bufferSize) + reader, err := packed.NewPackedReader(paths, arrowSchema, bufferSize) if err != nil { return nil, merr.WrapErrParameterInvalid("New binlog record packed reader error: %s", err.Error()) } @@ -79,15 +78,14 @@ func NewPackedRecordReader(path string, schema *schemapb.CollectionSchema, buffe reader: reader, schema: schema, bufferSize: bufferSize, - path: path, field2Col: field2Col, }, nil } -func NewPackedDeserializeReader(path string, schema *schemapb.CollectionSchema, - bufferSize int, pkFieldID FieldID, +func NewPackedDeserializeReader(paths []string, schema *schemapb.CollectionSchema, + bufferSize int64, pkFieldID FieldID, ) (*DeserializeReader[*Value], error) { - reader, err := NewPackedRecordReader(path, schema, bufferSize) + reader, err := NewPackedRecordReader(paths, schema, bufferSize) if err != nil { return nil, err } @@ -149,9 +147,11 @@ var _ RecordWriter = (*packedRecordWriter)(nil) type packedRecordWriter struct { writer *packed.PackedWriter - bufferSize int - path string - schema *arrow.Schema + bufferSize int64 + multiPartUploadSize int64 + columnGroups [][]int + paths []string + schema *arrow.Schema numRows int writtenUncompressed uint64 @@ -181,8 +181,8 @@ func (pw *packedRecordWriter) Close() error { return nil } -func NewPackedRecordWriter(path string, schema *arrow.Schema, bufferSize int) (*packedRecordWriter, error) { - writer, err := packed.NewPackedWriter(path, schema, bufferSize) +func NewPackedRecordWriter(paths []string, schema *arrow.Schema, bufferSize int64, multiPartUploadSize int64, columnGroups [][]int) (*packedRecordWriter, error) { + writer, err := packed.NewPackedWriter(paths, schema, bufferSize, multiPartUploadSize, columnGroups) if err != nil { return nil, merr.WrapErrServiceInternal( fmt.Sprintf("can not new packed record writer %s", err.Error())) @@ -191,19 +191,17 @@ func NewPackedRecordWriter(path string, schema *arrow.Schema, bufferSize int) (* writer: writer, schema: schema, bufferSize: bufferSize, - path: path, + paths: paths, }, nil } -func NewPackedSerializeWriter(schema *schemapb.CollectionSchema, partitionID, segmentID UniqueID, - batchSize int, path string, bufferSize int, -) (*SerializeWriter[*Value], error) { +func NewPackedSerializeWriter(paths []string, schema *schemapb.CollectionSchema, bufferSize int64, multiPartUploadSize int64, columnGroups [][]int, batchSize int) (*SerializeWriter[*Value], error) { arrowSchema, err := ConvertToArrowSchema(schema.Fields) if err != nil { return nil, merr.WrapErrServiceInternal( fmt.Sprintf("can not convert collection schema %s to arrow schema: %s", schema.Name, err.Error())) } - packedRecordWriter, err := NewPackedRecordWriter(path, arrowSchema, bufferSize) + packedRecordWriter, err := NewPackedRecordWriter(paths, arrowSchema, bufferSize, multiPartUploadSize, columnGroups) if err != nil { return nil, merr.WrapErrServiceInternal( fmt.Sprintf("can not new packed record writer %s", err.Error())) diff --git a/internal/storage/serde_events_v2_test.go b/internal/storage/serde_events_v2_test.go index f63d35701adb1..491b860e6f98b 100644 --- a/internal/storage/serde_events_v2_test.go +++ b/internal/storage/serde_events_v2_test.go @@ -21,11 +21,13 @@ import ( "github.com/stretchr/testify/assert" + "github.com/milvus-io/milvus/internal/util/initcore" "github.com/milvus-io/milvus/pkg/common" ) func TestPackedSerde(t *testing.T) { - t.Run("test binlog packed deserialize reader", func(t *testing.T) { + t.Run("test binlog packed serde v2", func(t *testing.T) { + initcore.InitLocalArrowFileSystem("/tmp") size := 10 blobs, err := generateTestData(size) @@ -35,11 +37,17 @@ func TestPackedSerde(t *testing.T) { assert.NoError(t, err) defer reader.Close() - path := "/tmp" - bufferSize := 10 * 1024 * 1024 // 10MB + paths := []string{"/tmp/0"} + bufferSize := int64(10 * 1024 * 1024) // 10MB schema := generateTestSchema() - - writer, err := NewPackedSerializeWriter(schema, 0, 0, 7, path, bufferSize) + group := []int{} + for i := 0; i < len(schema.Fields); i++ { + group = append(group, i) + } + columnGroups := [][]int{group} + multiPartUploadSize := int64(0) + batchSize := 7 + writer, err := NewPackedSerializeWriter(paths, schema, bufferSize, multiPartUploadSize, columnGroups, batchSize) assert.NoError(t, err) for i := 1; i <= size; i++ { @@ -54,7 +62,7 @@ func TestPackedSerde(t *testing.T) { err = writer.Close() assert.NoError(t, err) - reader, err = NewPackedDeserializeReader(path, schema, bufferSize, common.RowIDField) + reader, err = NewPackedDeserializeReader(paths, schema, bufferSize, common.RowIDField) assert.NoError(t, err) defer reader.Close() diff --git a/internal/storagev2/packed/packed_reader.go b/internal/storagev2/packed/packed_reader.go index c47b443af138e..8566b3747ab92 100644 --- a/internal/storagev2/packed/packed_reader.go +++ b/internal/storagev2/packed/packed_reader.go @@ -30,22 +30,27 @@ import ( "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/cdata" - "github.com/cockroachdb/errors" ) -func NewPackedReader(path string, schema *arrow.Schema, bufferSize int) (*PackedReader, error) { +func NewPackedReader(filePaths []string, schema *arrow.Schema, bufferSize int64) (*PackedReader, error) { + cFilePaths := make([]*C.char, len(filePaths)) + for i, path := range filePaths { + cFilePaths[i] = C.CString(path) + defer C.free(unsafe.Pointer(cFilePaths[i])) + } + cFilePathsArray := (**C.char)(unsafe.Pointer(&cFilePaths[0])) + cNumPaths := C.int64_t(len(filePaths)) + var cas cdata.CArrowSchema cdata.ExportArrowSchema(schema, &cas) cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas)) - cPath := C.CString(path) - defer C.free(unsafe.Pointer(cPath)) cBufferSize := C.int64_t(bufferSize) var cPackedReader C.CPackedReader - status := C.NewPackedReader(cPath, cSchema, cBufferSize, &cPackedReader) - if status != 0 { - return nil, fmt.Errorf("failed to new packed reader: %s, status: %d", path, status) + status := C.NewPackedReader(cFilePathsArray, cNumPaths, cSchema, cBufferSize, &cPackedReader) + if err := ConsumeCStatusIntoError(&status); err != nil { + return nil, err } return &PackedReader{cPackedReader: cPackedReader, schema: schema}, nil } @@ -54,8 +59,8 @@ func (pr *PackedReader) ReadNext() (arrow.Record, error) { var cArr C.CArrowArray var cSchema C.CArrowSchema status := C.ReadNext(pr.cPackedReader, &cArr, &cSchema) - if status != 0 { - return nil, fmt.Errorf("ReadNext failed with error code %d", status) + if err := ConsumeCStatusIntoError(&status); err != nil { + return nil, err } if cArr == nil { @@ -76,8 +81,8 @@ func (pr *PackedReader) ReadNext() (arrow.Record, error) { func (pr *PackedReader) Close() error { status := C.CloseReader(pr.cPackedReader) - if status != 0 { - return errors.New("PackedReader: failed to close file") + if err := ConsumeCStatusIntoError(&status); err != nil { + return err } return nil } diff --git a/internal/storagev2/packed/packed_test.go b/internal/storagev2/packed/packed_test.go index 3043e0a80fbd2..04dda6dbdb1b7 100644 --- a/internal/storagev2/packed/packed_test.go +++ b/internal/storagev2/packed/packed_test.go @@ -22,6 +22,9 @@ import ( "github.com/apache/arrow/go/v12/arrow/memory" "github.com/stretchr/testify/suite" "golang.org/x/exp/rand" + + "github.com/milvus-io/milvus/internal/util/initcore" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestPackedReadAndWrite(t *testing.T) { @@ -30,11 +33,17 @@ func TestPackedReadAndWrite(t *testing.T) { type PackedTestSuite struct { suite.Suite - schema *arrow.Schema - rec arrow.Record + schema *arrow.Schema + rec arrow.Record + localDataRootPath string +} + +func (suite *PackedTestSuite) SetupSuite() { + paramtable.Init() } func (suite *PackedTestSuite) SetupTest() { + initcore.InitLocalArrowFileSystem("/tmp") schema := arrow.NewSchema([]arrow.Field{ {Name: "a", Type: arrow.PrimitiveTypes.Int32}, {Name: "b", Type: arrow.PrimitiveTypes.Int64}, @@ -67,9 +76,11 @@ func (suite *PackedTestSuite) SetupTest() { func (suite *PackedTestSuite) TestPackedOneFile() { batches := 100 - path := "/tmp" - bufferSize := 10 * 1024 * 1024 // 10MB - pw, err := NewPackedWriter(path, suite.schema, bufferSize) + paths := []string{"/tmp/100"} + columnGroups := [][]int{{0, 1, 2}} + bufferSize := int64(10 * 1024 * 1024) // 10MB + multiPartUploadSize := int64(0) + pw, err := NewPackedWriter(paths, suite.schema, bufferSize, multiPartUploadSize, columnGroups) suite.NoError(err) for i := 0; i < batches; i++ { err = pw.WriteRecordBatch(suite.rec) @@ -78,7 +89,7 @@ func (suite *PackedTestSuite) TestPackedOneFile() { err = pw.Close() suite.NoError(err) - reader, err := NewPackedReader(path, suite.schema, bufferSize) + reader, err := NewPackedReader(paths, suite.schema, bufferSize) suite.NoError(err) rr, err := reader.ReadNext() suite.NoError(err) @@ -117,9 +128,11 @@ func (suite *PackedTestSuite) TestPackedMultiFiles() { } rec := b.NewRecord() defer rec.Release() - path := "/tmp" - bufferSize := 10 * 1024 * 1024 // 10MB - pw, err := NewPackedWriter(path, suite.schema, bufferSize) + paths := []string{"/tmp/100", "/tmp/101"} + columnGroups := [][]int{{2}, {0, 1}} + bufferSize := int64(10 * 1024 * 1024) // 10MB + multiPartUploadSize := int64(0) + pw, err := NewPackedWriter(paths, suite.schema, bufferSize, multiPartUploadSize, columnGroups) suite.NoError(err) for i := 0; i < batches; i++ { err = pw.WriteRecordBatch(rec) @@ -128,7 +141,7 @@ func (suite *PackedTestSuite) TestPackedMultiFiles() { err = pw.Close() suite.NoError(err) - reader, err := NewPackedReader(path, suite.schema, bufferSize) + reader, err := NewPackedReader(paths, suite.schema, bufferSize) suite.NoError(err) var rows int64 = 0 var rr arrow.Record diff --git a/internal/storagev2/packed/packed_writer.go b/internal/storagev2/packed/packed_writer.go index bca82da0a1cf9..ab6d107c2068e 100644 --- a/internal/storagev2/packed/packed_writer.go +++ b/internal/storagev2/packed/packed_writer.go @@ -19,6 +19,7 @@ package packed #include #include "segcore/packed_writer_c.h" +#include "segcore/column_groups_c.h" #include "arrow/c/abi.h" #include "arrow/c/helpers.h" */ @@ -30,23 +31,43 @@ import ( "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/cdata" - "github.com/cockroachdb/errors" ) -func NewPackedWriter(path string, schema *arrow.Schema, bufferSize int) (*PackedWriter, error) { +func NewPackedWriter(filePaths []string, schema *arrow.Schema, bufferSize int64, multiPartUploadSize int64, columnGroups [][]int) (*PackedWriter, error) { + cFilePaths := make([]*C.char, len(filePaths)) + for i, path := range filePaths { + cFilePaths[i] = C.CString(path) + defer C.free(unsafe.Pointer(cFilePaths[i])) + } + cFilePathsArray := (**C.char)(unsafe.Pointer(&cFilePaths[0])) + cNumPaths := C.int64_t(len(filePaths)) + var cas cdata.CArrowSchema cdata.ExportArrowSchema(schema, &cas) cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas)) - cPath := C.CString(path) - defer C.free(unsafe.Pointer(cPath)) - cBufferSize := C.int64_t(bufferSize) + cMultiPartUploadSize := C.int64_t(multiPartUploadSize) + + cColumnGroups := C.NewCColumnGroups() + for _, group := range columnGroups { + cGroup := C.malloc(C.size_t(len(group)) * C.size_t(unsafe.Sizeof(C.int(0)))) + if cGroup == nil { + return nil, fmt.Errorf("failed to allocate memory for column groups") + } + cGroupSlice := (*[1 << 30]C.int)(cGroup)[:len(group):len(group)] + for i, val := range group { + cGroupSlice[i] = C.int(val) + } + C.AddCColumnGroup(cColumnGroups, (*C.int)(cGroup), C.int(len(group))) + C.free(cGroup) + } + var cPackedWriter C.CPackedWriter - status := C.NewPackedWriter(cPath, cSchema, cBufferSize, &cPackedWriter) - if status != 0 { - return nil, fmt.Errorf("failed to new packed writer: %s, status: %d", path, status) + status := C.NewPackedWriter(cSchema, cBufferSize, cFilePathsArray, cNumPaths, cMultiPartUploadSize, cColumnGroups, &cPackedWriter) + if err := ConsumeCStatusIntoError(&status); err != nil { + return nil, err } return &PackedWriter{cPackedWriter: cPackedWriter}, nil } @@ -61,8 +82,8 @@ func (pw *PackedWriter) WriteRecordBatch(recordBatch arrow.Record) error { cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas)) status := C.WriteRecordBatch(pw.cPackedWriter, cArr, cSchema) - if status != 0 { - return errors.New("PackedWriter: failed to write record batch") + if err := ConsumeCStatusIntoError(&status); err != nil { + return err } return nil @@ -70,8 +91,8 @@ func (pw *PackedWriter) WriteRecordBatch(recordBatch arrow.Record) error { func (pw *PackedWriter) Close() error { status := C.CloseWriter(pw.cPackedWriter) - if status != 0 { - return errors.New("PackedWriter: failed to close file") + if err := ConsumeCStatusIntoError(&status); err != nil { + return err } return nil } diff --git a/internal/storagev2/packed/util.go b/internal/storagev2/packed/util.go new file mode 100644 index 0000000000000..c96e5d8e22f8c --- /dev/null +++ b/internal/storagev2/packed/util.go @@ -0,0 +1,43 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package packed + +/* +#cgo pkg-config: milvus_core + +#include "common/type_c.h" +#include "common/protobuf_utils_c.h" +#include "segcore/segment_c.h" +#include "storage/storage_c.h" +*/ +import "C" + +import ( + "unsafe" + + "github.com/milvus-io/milvus/pkg/util/merr" +) + +func ConsumeCStatusIntoError(status *C.CStatus) error { + if status == nil || status.error_code == 0 { + return nil + } + errorCode := status.error_code + errorMsg := C.GoString(status.error_msg) + C.free(unsafe.Pointer(status.error_msg)) + return merr.SegcoreError(int32(errorCode), errorMsg) +} diff --git a/internal/util/initcore/init_core.go b/internal/util/initcore/init_core.go index 88a4f4ae5aa15..520673f3abe4f 100644 --- a/internal/util/initcore/init_core.go +++ b/internal/util/initcore/init_core.go @@ -24,6 +24,7 @@ package initcore #include "common/init_c.h" #include "segcore/segcore_init_c.h" #include "storage/storage_c.h" +#include "segcore/arrow_fs_c.h" */ import "C" @@ -126,6 +127,69 @@ func callWithTimeout(fn func(), timeoutHandler func(), timeout time.Duration) { } } +func InitStorageV2FileSystem(params *paramtable.ComponentParam) error { + if params.CommonCfg.StorageType.GetValue() == "local" { + return InitLocalArrowFileSystem(params.LocalStorageCfg.Path.GetValue()) + } + return InitRemoteArrowFileSystem(params) +} + +func InitLocalArrowFileSystem(path string) error { + CLocalRootPath := C.CString(path) + defer C.free(unsafe.Pointer(CLocalRootPath)) + status := C.InitLocalArrowFileSystemSingleton(CLocalRootPath) + return HandleCStatus(&status, "InitLocalArrowFileSystemSingleton failed") +} + +func InitRemoteArrowFileSystem(params *paramtable.ComponentParam) error { + cAddress := C.CString(params.MinioCfg.Address.GetValue()) + cBucketName := C.CString(params.MinioCfg.BucketName.GetValue()) + cAccessKey := C.CString(params.MinioCfg.AccessKeyID.GetValue()) + cAccessValue := C.CString(params.MinioCfg.SecretAccessKey.GetValue()) + cRootPath := C.CString(params.MinioCfg.RootPath.GetValue()) + cStorageType := C.CString(params.CommonCfg.StorageType.GetValue()) + cIamEndPoint := C.CString(params.MinioCfg.IAMEndpoint.GetValue()) + cCloudProvider := C.CString(params.MinioCfg.CloudProvider.GetValue()) + cLogLevel := C.CString(params.MinioCfg.LogLevel.GetValue()) + cRegion := C.CString(params.MinioCfg.Region.GetValue()) + cSslCACert := C.CString(params.MinioCfg.SslCACert.GetValue()) + cGcpCredentialJSON := C.CString(params.MinioCfg.GcpCredentialJSON.GetValue()) + defer C.free(unsafe.Pointer(cAddress)) + defer C.free(unsafe.Pointer(cBucketName)) + defer C.free(unsafe.Pointer(cAccessKey)) + defer C.free(unsafe.Pointer(cAccessValue)) + defer C.free(unsafe.Pointer(cRootPath)) + defer C.free(unsafe.Pointer(cStorageType)) + defer C.free(unsafe.Pointer(cIamEndPoint)) + defer C.free(unsafe.Pointer(cLogLevel)) + defer C.free(unsafe.Pointer(cRegion)) + defer C.free(unsafe.Pointer(cCloudProvider)) + defer C.free(unsafe.Pointer(cSslCACert)) + defer C.free(unsafe.Pointer(cGcpCredentialJSON)) + storageConfig := C.CStorageConfig{ + address: cAddress, + bucket_name: cBucketName, + access_key_id: cAccessKey, + access_key_value: cAccessValue, + root_path: cRootPath, + storage_type: cStorageType, + iam_endpoint: cIamEndPoint, + cloud_provider: cCloudProvider, + useSSL: C.bool(params.MinioCfg.UseSSL.GetAsBool()), + sslCACert: cSslCACert, + useIAM: C.bool(params.MinioCfg.UseIAM.GetAsBool()), + log_level: cLogLevel, + region: cRegion, + useVirtualHost: C.bool(params.MinioCfg.UseVirtualHost.GetAsBool()), + requestTimeoutMs: C.int64_t(params.MinioCfg.RequestTimeoutMs.GetAsInt64()), + gcp_credential_json: cGcpCredentialJSON, + use_custom_part_upload: true, + } + + status := C.InitRemoteArrowFileSystemSingleton(storageConfig) + return HandleCStatus(&status, "InitRemoteChunkManagerSingleton failed") +} + func InitRemoteChunkManager(params *paramtable.ComponentParam) error { cAddress := C.CString(params.MinioCfg.Address.GetValue()) cBucketName := C.CString(params.MinioCfg.BucketName.GetValue()) diff --git a/internal/util/initcore/init_core_test.go b/internal/util/initcore/init_core_test.go index 57f6f45bd9f40..25eefa8355836 100644 --- a/internal/util/initcore/init_core_test.go +++ b/internal/util/initcore/init_core_test.go @@ -45,3 +45,32 @@ func TestOtlpHang(t *testing.T) { ResetTraceConfig(paramtable.Get()) }) } + +func TestInitStorageV2FileSystem(t *testing.T) { + // init local storage + paramtable.Init() + paramtable.Get().Save(paramtable.Get().CommonCfg.StorageType.Key, "local") + paramtable.Get().Save(paramtable.Get().LocalStorageCfg.Path.Key, "/tmp") + err := InitStorageV2FileSystem(paramtable.Get()) + assert.NoError(t, err) + + // init remote storage + paramtable.Get().Save(paramtable.Get().MinioCfg.Address.Key, "oss-cn-hangzhou.aliyuncs.com") + paramtable.Get().Save(paramtable.Get().MinioCfg.BucketName.Key, "test-oss-0815") + paramtable.Get().Save(paramtable.Get().MinioCfg.AccessKeyID.Key, "test") + paramtable.Get().Save(paramtable.Get().MinioCfg.SecretAccessKey.Key, "test") + paramtable.Get().Save(paramtable.Get().MinioCfg.RootPath.Key, "test") + paramtable.Get().Save(paramtable.Get().CommonCfg.StorageType.Key, "remote") + paramtable.Get().Save(paramtable.Get().MinioCfg.CloudProvider.Key, "aliyun") + paramtable.Get().Save(paramtable.Get().MinioCfg.IAMEndpoint.Key, "") + paramtable.Get().Save(paramtable.Get().MinioCfg.LogLevel.Key, "warn") + paramtable.Get().Save(paramtable.Get().MinioCfg.Region.Key, "oss-cn-hangzhou") + paramtable.Get().Save(paramtable.Get().MinioCfg.UseSSL.Key, "false") + paramtable.Get().Save(paramtable.Get().MinioCfg.SslCACert.Key, "") + paramtable.Get().Save(paramtable.Get().MinioCfg.UseIAM.Key, "false") + paramtable.Get().Save(paramtable.Get().MinioCfg.UseVirtualHost.Key, "false") + paramtable.Get().Save(paramtable.Get().MinioCfg.RequestTimeoutMs.Key, "10000") + + err = InitStorageV2FileSystem(paramtable.Get()) + assert.NoError(t, err) +}