From d3a7dde4960bc9ed5a29a6d94b7a61dd650b1700 Mon Sep 17 00:00:00 2001 From: shaoting-huang Date: Tue, 11 Feb 2025 15:47:19 +0800 Subject: [PATCH] storage v2 packed api integration Signed-off-by: shaoting-huang --- internal/core/src/segcore/column_groups_c.cpp | 46 +++++++++++++++++++ internal/core/src/segcore/column_groups_c.h | 35 ++++++++++++++ internal/core/src/segcore/packed_reader_c.cpp | 5 +- internal/core/src/segcore/packed_reader_c.h | 2 + internal/core/src/segcore/packed_writer_c.cpp | 18 ++++++-- internal/core/src/segcore/packed_writer_c.h | 4 ++ .../thirdparty/milvus-storage/CMakeLists.txt | 4 +- internal/storagev2/packed/packed_reader.go | 20 ++++++-- internal/storagev2/packed/packed_test.go | 12 +++-- internal/storagev2/packed/packed_writer.go | 38 +++++++++++++-- 10 files changed, 163 insertions(+), 21 deletions(-) create mode 100644 internal/core/src/segcore/column_groups_c.cpp create mode 100644 internal/core/src/segcore/column_groups_c.h diff --git a/internal/core/src/segcore/column_groups_c.cpp b/internal/core/src/segcore/column_groups_c.cpp new file mode 100644 index 0000000000000..43bc0f9a400a6 --- /dev/null +++ b/internal/core/src/segcore/column_groups_c.cpp @@ -0,0 +1,46 @@ +// Copyright 2025 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "segcore/column_groups_c.h" +#include +#include +#include + +using VecVecInt = std::vector>; + +extern "C" { + +CColumnGroups NewCColumnGroups() { + auto vv = std::make_unique(); + return vv.release(); +} + +void AddCColumnGroup(CColumnGroups cgs, int* group, int group_size) { + if (!cgs || !group) + return; + + auto vv = static_cast(cgs); + vv->emplace_back(group, group + group_size); +} + +int CColumnGroupsSize(CColumnGroups cgs) { + if (!cgs) + return 0; + + auto vv = static_cast(cgs); + return static_cast(vv->size()); +} + +void FreeCColumnGroups(CColumnGroups cgs) { delete static_cast(cgs); } +} \ No newline at end of file diff --git a/internal/core/src/segcore/column_groups_c.h b/internal/core/src/segcore/column_groups_c.h new file mode 100644 index 0000000000000..fca407d417d5a --- /dev/null +++ b/internal/core/src/segcore/column_groups_c.h @@ -0,0 +1,35 @@ +// Copyright 2025 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void* CColumnGroups; + +CColumnGroups NewCColumnGroups(); + +void AddCColumnGroup(CColumnGroups cgs, int* group, int group_size); + +int CColumnGroupsSize(CColumnGroups cgs); + +void FreeCColumnGroups(CColumnGroups cgs); + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/internal/core/src/segcore/packed_reader_c.cpp b/internal/core/src/segcore/packed_reader_c.cpp index 56aaf0e1977c5..632c9f3622971 100644 --- a/internal/core/src/segcore/packed_reader_c.cpp +++ b/internal/core/src/segcore/packed_reader_c.cpp @@ -25,11 +25,14 @@ int NewPackedReader(const char* path, + char** paths, + int64_t num_paths, struct ArrowSchema* schema, const int64_t buffer_size, CPackedReader* c_packed_reader) { try { auto truePath = std::string(path); + auto truePaths = std::vector(paths, paths + num_paths); auto factory = std::make_shared(); auto conf = milvus_storage::StorageConfig(); conf.uri = "file:///tmp/"; @@ -40,7 +43,7 @@ NewPackedReader(const char* path, needed_columns.emplace(i); } auto reader = std::make_unique( - *trueFs, path, trueSchema, needed_columns, buffer_size); + *trueFs, truePaths, trueSchema, needed_columns, buffer_size); *c_packed_reader = reader.release(); return 0; } catch (std::exception& e) { diff --git a/internal/core/src/segcore/packed_reader_c.h b/internal/core/src/segcore/packed_reader_c.h index 7a5c90cf16e3c..92894bf0f3027 100644 --- a/internal/core/src/segcore/packed_reader_c.h +++ b/internal/core/src/segcore/packed_reader_c.h @@ -34,6 +34,8 @@ typedef void* CArrowSchema; */ int NewPackedReader(const char* path, + char** paths, + int64_t num_paths, struct ArrowSchema* schema, const int64_t buffer_size, CPackedReader* c_packed_reader); diff --git a/internal/core/src/segcore/packed_writer_c.cpp b/internal/core/src/segcore/packed_writer_c.cpp index 613e21d78013a..3d3fff17586c2 100644 --- a/internal/core/src/segcore/packed_writer_c.cpp +++ b/internal/core/src/segcore/packed_writer_c.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "segcore/column_groups_c.h" #include "segcore/packed_writer_c.h" #include "milvus-storage/packed/writer.h" #include "milvus-storage/common/log.h" @@ -22,19 +23,28 @@ #include int -NewPackedWriter(const char* path, +NewPackedWriter(const char* fsPath, struct ArrowSchema* schema, const int64_t buffer_size, + char** paths, + int64_t num_paths, + CColumnGroups column_groups, CPackedWriter* c_packed_writer) { try { - auto truePath = std::string(path); + auto trueFsPath = std::string(fsPath); + auto truePaths = std::vector(paths, paths + num_paths); + auto factory = std::make_shared(); auto conf = milvus_storage::StorageConfig(); conf.uri = "file:///tmp/"; - auto trueFs = factory->BuildFileSystem(conf, &truePath).value(); + auto trueFs = factory->BuildFileSystem(conf, &trueFsPath).value(); + auto trueSchema = arrow::ImportSchema(schema).ValueOrDie(); + + auto columnGroups = *static_cast>*>(column_groups); + auto writer = std::make_unique( - buffer_size, trueSchema, trueFs, truePath, conf); + trueFs, truePaths, trueSchema, conf, columnGroups, buffer_size); *c_packed_writer = writer.release(); return 0; diff --git a/internal/core/src/segcore/packed_writer_c.h b/internal/core/src/segcore/packed_writer_c.h index 207aba502d468..244532fa4f54a 100644 --- a/internal/core/src/segcore/packed_writer_c.h +++ b/internal/core/src/segcore/packed_writer_c.h @@ -19,6 +19,7 @@ extern "C" { #endif #include +#include "segcore/column_groups_c.h" typedef void* CPackedWriter; @@ -26,6 +27,9 @@ int NewPackedWriter(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, + char** paths, + int64_t num_paths, + CColumnGroups column_groups, CPackedWriter* c_packed_writer); int diff --git a/internal/core/thirdparty/milvus-storage/CMakeLists.txt b/internal/core/thirdparty/milvus-storage/CMakeLists.txt index a847c41a47597..190eb4043b746 100644 --- a/internal/core/thirdparty/milvus-storage/CMakeLists.txt +++ b/internal/core/thirdparty/milvus-storage/CMakeLists.txt @@ -14,8 +14,8 @@ # Update milvus-storage_VERSION for the first occurrence milvus_add_pkg_config("milvus-storage") set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "") -set( milvus-storage_VERSION 7475494 ) -set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git") +set( milvus-storage_VERSION 9a05d71 ) +set( GIT_REPOSITORY "https://github.com/shaoting-huang/milvus-storage.git") message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}") message(STATUS "milvus-storage version: ${milvus-storage_VERSION}") diff --git a/internal/storagev2/packed/packed_reader.go b/internal/storagev2/packed/packed_reader.go index c47b443af138e..ab57cdfed70c2 100644 --- a/internal/storagev2/packed/packed_reader.go +++ b/internal/storagev2/packed/packed_reader.go @@ -33,19 +33,29 @@ import ( "github.com/cockroachdb/errors" ) -func NewPackedReader(path string, schema *arrow.Schema, bufferSize int) (*PackedReader, error) { +func NewPackedReader(fsPath string, filePaths []string, schema *arrow.Schema, bufferSize int) (*PackedReader, error) { var cas cdata.CArrowSchema cdata.ExportArrowSchema(schema, &cas) cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas)) - cPath := C.CString(path) - defer C.free(unsafe.Pointer(cPath)) + + cFsPath := C.CString(fsPath) + defer C.free(unsafe.Pointer(cFsPath)) + + cFilePaths := make([]*C.char, len(filePaths)) + for i, path := range filePaths { + cFilePaths[i] = C.CString(path) + defer C.free(unsafe.Pointer(cFilePaths[i])) + } + + cFilePathsArray := (**C.char)(unsafe.Pointer(&cFilePaths[0])) + cNumPaths := C.int64_t(len(filePaths)) cBufferSize := C.int64_t(bufferSize) var cPackedReader C.CPackedReader - status := C.NewPackedReader(cPath, cSchema, cBufferSize, &cPackedReader) + status := C.NewPackedReader(cFsPath, cFilePathsArray, cNumPaths, cSchema, cBufferSize, &cPackedReader) if status != 0 { - return nil, fmt.Errorf("failed to new packed reader: %s, status: %d", path, status) + return nil, fmt.Errorf("failed to new packed reader: %s, status: %d", fsPath, status) } return &PackedReader{cPackedReader: cPackedReader, schema: schema}, nil } diff --git a/internal/storagev2/packed/packed_test.go b/internal/storagev2/packed/packed_test.go index 3043e0a80fbd2..18daa6af9d76c 100644 --- a/internal/storagev2/packed/packed_test.go +++ b/internal/storagev2/packed/packed_test.go @@ -68,8 +68,10 @@ func (suite *PackedTestSuite) TestPackedOneFile() { batches := 100 path := "/tmp" + paths := []string{"/tmp/100"} + columnGroups := [][]int{{0, 1, 2}} bufferSize := 10 * 1024 * 1024 // 10MB - pw, err := NewPackedWriter(path, suite.schema, bufferSize) + pw, err := NewPackedWriter(path, suite.schema, bufferSize, paths, columnGroups) suite.NoError(err) for i := 0; i < batches; i++ { err = pw.WriteRecordBatch(suite.rec) @@ -78,7 +80,7 @@ func (suite *PackedTestSuite) TestPackedOneFile() { err = pw.Close() suite.NoError(err) - reader, err := NewPackedReader(path, suite.schema, bufferSize) + reader, err := NewPackedReader(path, paths, suite.schema, bufferSize) suite.NoError(err) rr, err := reader.ReadNext() suite.NoError(err) @@ -118,8 +120,10 @@ func (suite *PackedTestSuite) TestPackedMultiFiles() { rec := b.NewRecord() defer rec.Release() path := "/tmp" + paths := []string{"/tmp/100", "/tmp/101"} + columnGroups := [][]int{{2}, {0, 1}} bufferSize := 10 * 1024 * 1024 // 10MB - pw, err := NewPackedWriter(path, suite.schema, bufferSize) + pw, err := NewPackedWriter(path, suite.schema, bufferSize, paths, columnGroups) suite.NoError(err) for i := 0; i < batches; i++ { err = pw.WriteRecordBatch(rec) @@ -128,7 +132,7 @@ func (suite *PackedTestSuite) TestPackedMultiFiles() { err = pw.Close() suite.NoError(err) - reader, err := NewPackedReader(path, suite.schema, bufferSize) + reader, err := NewPackedReader(path, paths, suite.schema, bufferSize) suite.NoError(err) var rows int64 = 0 var rr arrow.Record diff --git a/internal/storagev2/packed/packed_writer.go b/internal/storagev2/packed/packed_writer.go index bca82da0a1cf9..02b080813cde3 100644 --- a/internal/storagev2/packed/packed_writer.go +++ b/internal/storagev2/packed/packed_writer.go @@ -19,6 +19,7 @@ package packed #include #include "segcore/packed_writer_c.h" +#include "segcore/column_groups_c.h" #include "arrow/c/abi.h" #include "arrow/c/helpers.h" */ @@ -33,20 +34,47 @@ import ( "github.com/cockroachdb/errors" ) -func NewPackedWriter(path string, schema *arrow.Schema, bufferSize int) (*PackedWriter, error) { +func NewPackedWriter(fsPath string, schema *arrow.Schema, bufferSize int, filePaths []string, columnGroups [][]int) (*PackedWriter, error) { var cas cdata.CArrowSchema cdata.ExportArrowSchema(schema, &cas) cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas)) - cPath := C.CString(path) - defer C.free(unsafe.Pointer(cPath)) + cFsPath := C.CString(fsPath) + defer C.free(unsafe.Pointer(cFsPath)) cBufferSize := C.int64_t(bufferSize) + cFilePaths := make([]*C.char, len(filePaths)) + for i, path := range filePaths { + cFilePaths[i] = C.CString(path) // Convert Go string to C string + defer C.free(unsafe.Pointer(cFilePaths[i])) // Ensure memory is freed after use + } + + // Create a pointer to the array of C strings (char**) + cFilePathsArray := (**C.char)(unsafe.Pointer(&cFilePaths[0])) + cNumPaths := C.int64_t(len(filePaths)) + + cColumnGroups := C.NewCColumnGroups() + for _, group := range columnGroups { + cGroup := C.malloc(C.size_t(len(group)) * C.size_t(unsafe.Sizeof(C.int(0)))) + if cGroup == nil { + return nil, fmt.Errorf("failed to allocate memory for column groups") + } + + cGroupSlice := (*[1 << 30]C.int)(cGroup)[:len(group):len(group)] + for i, val := range group { + cGroupSlice[i] = C.int(val) + } + + C.AddCColumnGroup(cColumnGroups, (*C.int)(cGroup), C.int(len(group))) + + C.free(cGroup) + } + var cPackedWriter C.CPackedWriter - status := C.NewPackedWriter(cPath, cSchema, cBufferSize, &cPackedWriter) + status := C.NewPackedWriter(cFsPath, cSchema, cBufferSize, cFilePathsArray, cNumPaths, cColumnGroups, &cPackedWriter) if status != 0 { - return nil, fmt.Errorf("failed to new packed writer: %s, status: %d", path, status) + return nil, fmt.Errorf("failed to new packed writer: %s, status: %d", fsPath, status) } return &PackedWriter{cPackedWriter: cPackedWriter}, nil }