Skip to content

Commit

Permalink
storage v2 packed api integration
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Feb 11, 2025
1 parent 15c8798 commit d3a7dde
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 21 deletions.
46 changes: 46 additions & 0 deletions internal/core/src/segcore/column_groups_c.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2025 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "segcore/column_groups_c.h"
#include <vector>
#include <string>
#include <memory>

using VecVecInt = std::vector<std::vector<int>>;

extern "C" {

CColumnGroups NewCColumnGroups() {
auto vv = std::make_unique<VecVecInt>();
return vv.release();
}

Check warning on line 27 in internal/core/src/segcore/column_groups_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/column_groups_c.cpp#L24-L27

Added lines #L24 - L27 were not covered by tests

void AddCColumnGroup(CColumnGroups cgs, int* group, int group_size) {
if (!cgs || !group)

Check warning on line 30 in internal/core/src/segcore/column_groups_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/column_groups_c.cpp#L29-L30

Added lines #L29 - L30 were not covered by tests
return;

auto vv = static_cast<VecVecInt*>(cgs);
vv->emplace_back(group, group + group_size);

Check warning on line 34 in internal/core/src/segcore/column_groups_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/column_groups_c.cpp#L33-L34

Added lines #L33 - L34 were not covered by tests
}

int CColumnGroupsSize(CColumnGroups cgs) {
if (!cgs)

Check warning on line 38 in internal/core/src/segcore/column_groups_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/column_groups_c.cpp#L37-L38

Added lines #L37 - L38 were not covered by tests
return 0;

auto vv = static_cast<VecVecInt*>(cgs);
return static_cast<int>(vv->size());

Check warning on line 42 in internal/core/src/segcore/column_groups_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/column_groups_c.cpp#L41-L42

Added lines #L41 - L42 were not covered by tests
}

void FreeCColumnGroups(CColumnGroups cgs) { delete static_cast<VecVecInt*>(cgs); }

Check warning on line 45 in internal/core/src/segcore/column_groups_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/column_groups_c.cpp#L45

Added line #L45 was not covered by tests
}
35 changes: 35 additions & 0 deletions internal/core/src/segcore/column_groups_c.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2025 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <stdint.h>

#ifdef __cplusplus
extern "C" {
#endif

typedef void* CColumnGroups;

CColumnGroups NewCColumnGroups();

void AddCColumnGroup(CColumnGroups cgs, int* group, int group_size);

int CColumnGroupsSize(CColumnGroups cgs);

void FreeCColumnGroups(CColumnGroups cgs);

#ifdef __cplusplus
}
#endif
5 changes: 4 additions & 1 deletion internal/core/src/segcore/packed_reader_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@

int
NewPackedReader(const char* path,
char** paths,
int64_t num_paths,
struct ArrowSchema* schema,
const int64_t buffer_size,
CPackedReader* c_packed_reader) {
try {
auto truePath = std::string(path);
auto truePaths = std::vector<std::string>(paths, paths + num_paths);

Check warning on line 35 in internal/core/src/segcore/packed_reader_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/packed_reader_c.cpp#L35

Added line #L35 was not covered by tests
auto factory = std::make_shared<milvus_storage::FileSystemFactory>();
auto conf = milvus_storage::StorageConfig();
conf.uri = "file:///tmp/";
Expand All @@ -40,7 +43,7 @@ NewPackedReader(const char* path,
needed_columns.emplace(i);
}
auto reader = std::make_unique<milvus_storage::PackedRecordBatchReader>(
*trueFs, path, trueSchema, needed_columns, buffer_size);
*trueFs, truePaths, trueSchema, needed_columns, buffer_size);

Check warning on line 46 in internal/core/src/segcore/packed_reader_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/packed_reader_c.cpp#L46

Added line #L46 was not covered by tests
*c_packed_reader = reader.release();
return 0;
} catch (std::exception& e) {
Expand Down
2 changes: 2 additions & 0 deletions internal/core/src/segcore/packed_reader_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ typedef void* CArrowSchema;
*/
int
NewPackedReader(const char* path,
char** paths,
int64_t num_paths,
struct ArrowSchema* schema,
const int64_t buffer_size,
CPackedReader* c_packed_reader);
Expand Down
18 changes: 14 additions & 4 deletions internal/core/src/segcore/packed_writer_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "segcore/column_groups_c.h"
#include "segcore/packed_writer_c.h"
#include "milvus-storage/packed/writer.h"
#include "milvus-storage/common/log.h"
Expand All @@ -22,19 +23,28 @@
#include <arrow/filesystem/filesystem.h>

int
NewPackedWriter(const char* path,
NewPackedWriter(const char* fsPath,

Check warning on line 26 in internal/core/src/segcore/packed_writer_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/packed_writer_c.cpp#L26

Added line #L26 was not covered by tests
struct ArrowSchema* schema,
const int64_t buffer_size,
char** paths,
int64_t num_paths,
CColumnGroups column_groups,
CPackedWriter* c_packed_writer) {
try {
auto truePath = std::string(path);
auto trueFsPath = std::string(fsPath);
auto truePaths = std::vector<std::string>(paths, paths + num_paths);

Check warning on line 35 in internal/core/src/segcore/packed_writer_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/packed_writer_c.cpp#L34-L35

Added lines #L34 - L35 were not covered by tests

auto factory = std::make_shared<milvus_storage::FileSystemFactory>();
auto conf = milvus_storage::StorageConfig();
conf.uri = "file:///tmp/";
auto trueFs = factory->BuildFileSystem(conf, &truePath).value();
auto trueFs = factory->BuildFileSystem(conf, &trueFsPath).value();

Check warning on line 40 in internal/core/src/segcore/packed_writer_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/packed_writer_c.cpp#L40

Added line #L40 was not covered by tests

auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();

auto columnGroups = *static_cast<std::vector<std::vector<int>>*>(column_groups);

Check warning on line 44 in internal/core/src/segcore/packed_writer_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/packed_writer_c.cpp#L44

Added line #L44 was not covered by tests

auto writer = std::make_unique<milvus_storage::PackedRecordBatchWriter>(
buffer_size, trueSchema, trueFs, truePath, conf);
trueFs, truePaths, trueSchema, conf, columnGroups, buffer_size);

Check warning on line 47 in internal/core/src/segcore/packed_writer_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/packed_writer_c.cpp#L47

Added line #L47 was not covered by tests

*c_packed_writer = writer.release();
return 0;
Expand Down
4 changes: 4 additions & 0 deletions internal/core/src/segcore/packed_writer_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ extern "C" {
#endif

#include <arrow/c/abi.h>
#include "segcore/column_groups_c.h"

typedef void* CPackedWriter;

int
NewPackedWriter(const char* path,
struct ArrowSchema* schema,
const int64_t buffer_size,
char** paths,
int64_t num_paths,
CColumnGroups column_groups,
CPackedWriter* c_packed_writer);

int
Expand Down
4 changes: 2 additions & 2 deletions internal/core/thirdparty/milvus-storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
# Update milvus-storage_VERSION for the first occurrence
milvus_add_pkg_config("milvus-storage")
set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "")
set( milvus-storage_VERSION 7475494 )
set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git")
set( milvus-storage_VERSION 9a05d71 )
set( GIT_REPOSITORY "https://github.com/shaoting-huang/milvus-storage.git")
message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}")
message(STATUS "milvus-storage version: ${milvus-storage_VERSION}")

Expand Down
20 changes: 15 additions & 5 deletions internal/storagev2/packed/packed_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,29 @@ import (
"github.com/cockroachdb/errors"
)

func NewPackedReader(path string, schema *arrow.Schema, bufferSize int) (*PackedReader, error) {
func NewPackedReader(fsPath string, filePaths []string, schema *arrow.Schema, bufferSize int) (*PackedReader, error) {
var cas cdata.CArrowSchema
cdata.ExportArrowSchema(schema, &cas)
cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas))
cPath := C.CString(path)
defer C.free(unsafe.Pointer(cPath))

cFsPath := C.CString(fsPath)
defer C.free(unsafe.Pointer(cFsPath))

cFilePaths := make([]*C.char, len(filePaths))
for i, path := range filePaths {
cFilePaths[i] = C.CString(path)
defer C.free(unsafe.Pointer(cFilePaths[i]))
}

cFilePathsArray := (**C.char)(unsafe.Pointer(&cFilePaths[0]))
cNumPaths := C.int64_t(len(filePaths))

cBufferSize := C.int64_t(bufferSize)

var cPackedReader C.CPackedReader
status := C.NewPackedReader(cPath, cSchema, cBufferSize, &cPackedReader)
status := C.NewPackedReader(cFsPath, cFilePathsArray, cNumPaths, cSchema, cBufferSize, &cPackedReader)
if status != 0 {
return nil, fmt.Errorf("failed to new packed reader: %s, status: %d", path, status)
return nil, fmt.Errorf("failed to new packed reader: %s, status: %d", fsPath, status)
}
return &PackedReader{cPackedReader: cPackedReader, schema: schema}, nil
}
Expand Down
12 changes: 8 additions & 4 deletions internal/storagev2/packed/packed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ func (suite *PackedTestSuite) TestPackedOneFile() {
batches := 100

path := "/tmp"
paths := []string{"/tmp/100"}
columnGroups := [][]int{{0, 1, 2}}
bufferSize := 10 * 1024 * 1024 // 10MB
pw, err := NewPackedWriter(path, suite.schema, bufferSize)
pw, err := NewPackedWriter(path, suite.schema, bufferSize, paths, columnGroups)
suite.NoError(err)
for i := 0; i < batches; i++ {
err = pw.WriteRecordBatch(suite.rec)
Expand All @@ -78,7 +80,7 @@ func (suite *PackedTestSuite) TestPackedOneFile() {
err = pw.Close()
suite.NoError(err)

reader, err := NewPackedReader(path, suite.schema, bufferSize)
reader, err := NewPackedReader(path, paths, suite.schema, bufferSize)
suite.NoError(err)
rr, err := reader.ReadNext()
suite.NoError(err)
Expand Down Expand Up @@ -118,8 +120,10 @@ func (suite *PackedTestSuite) TestPackedMultiFiles() {
rec := b.NewRecord()
defer rec.Release()
path := "/tmp"
paths := []string{"/tmp/100", "/tmp/101"}
columnGroups := [][]int{{2}, {0, 1}}
bufferSize := 10 * 1024 * 1024 // 10MB
pw, err := NewPackedWriter(path, suite.schema, bufferSize)
pw, err := NewPackedWriter(path, suite.schema, bufferSize, paths, columnGroups)
suite.NoError(err)
for i := 0; i < batches; i++ {
err = pw.WriteRecordBatch(rec)
Expand All @@ -128,7 +132,7 @@ func (suite *PackedTestSuite) TestPackedMultiFiles() {
err = pw.Close()
suite.NoError(err)

reader, err := NewPackedReader(path, suite.schema, bufferSize)
reader, err := NewPackedReader(path, paths, suite.schema, bufferSize)
suite.NoError(err)
var rows int64 = 0
var rr arrow.Record
Expand Down
38 changes: 33 additions & 5 deletions internal/storagev2/packed/packed_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package packed
#include <stdlib.h>
#include "segcore/packed_writer_c.h"
#include "segcore/column_groups_c.h"
#include "arrow/c/abi.h"
#include "arrow/c/helpers.h"
*/
Expand All @@ -33,20 +34,47 @@ import (
"github.com/cockroachdb/errors"
)

func NewPackedWriter(path string, schema *arrow.Schema, bufferSize int) (*PackedWriter, error) {
func NewPackedWriter(fsPath string, schema *arrow.Schema, bufferSize int, filePaths []string, columnGroups [][]int) (*PackedWriter, error) {
var cas cdata.CArrowSchema
cdata.ExportArrowSchema(schema, &cas)
cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas))

cPath := C.CString(path)
defer C.free(unsafe.Pointer(cPath))
cFsPath := C.CString(fsPath)
defer C.free(unsafe.Pointer(cFsPath))

cBufferSize := C.int64_t(bufferSize)

cFilePaths := make([]*C.char, len(filePaths))
for i, path := range filePaths {
cFilePaths[i] = C.CString(path) // Convert Go string to C string
defer C.free(unsafe.Pointer(cFilePaths[i])) // Ensure memory is freed after use
}

// Create a pointer to the array of C strings (char**)
cFilePathsArray := (**C.char)(unsafe.Pointer(&cFilePaths[0]))
cNumPaths := C.int64_t(len(filePaths))

cColumnGroups := C.NewCColumnGroups()
for _, group := range columnGroups {
cGroup := C.malloc(C.size_t(len(group)) * C.size_t(unsafe.Sizeof(C.int(0))))
if cGroup == nil {
return nil, fmt.Errorf("failed to allocate memory for column groups")
}

cGroupSlice := (*[1 << 30]C.int)(cGroup)[:len(group):len(group)]
for i, val := range group {
cGroupSlice[i] = C.int(val)
}

C.AddCColumnGroup(cColumnGroups, (*C.int)(cGroup), C.int(len(group)))

C.free(cGroup)
}

var cPackedWriter C.CPackedWriter
status := C.NewPackedWriter(cPath, cSchema, cBufferSize, &cPackedWriter)
status := C.NewPackedWriter(cFsPath, cSchema, cBufferSize, cFilePathsArray, cNumPaths, cColumnGroups, &cPackedWriter)
if status != 0 {
return nil, fmt.Errorf("failed to new packed writer: %s, status: %d", path, status)
return nil, fmt.Errorf("failed to new packed writer: %s, status: %d", fsPath, status)
}
return &PackedWriter{cPackedWriter: cPackedWriter}, nil
}
Expand Down

0 comments on commit d3a7dde

Please sign in to comment.