Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mmap4 #2424

Merged
merged 28 commits into from
Jan 7, 2025
Merged

Mmap4 #2424

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
68df975
Add own mem template in hnsw store.
small-turtle-1 Dec 23, 2024
671a849
Merge remote-tracking branch 'upstream/main' into mmap2
small-turtle-1 Dec 23, 2024
9064cf1
Refactor hnsw with mmap.
small-turtle-1 Dec 24, 2024
ea06169
Merge remote-tracking branch 'upstream/main' into mmap2
small-turtle-1 Dec 24, 2024
da100b9
Fix hnsw save to const.
small-turtle-1 Dec 24, 2024
f84bf23
Add mmap to hnsw.
small-turtle-1 Dec 24, 2024
4f6b07d
Merge remote-tracking branch 'upstream/main' into mmap2
small-turtle-1 Dec 24, 2024
2a5a20e
Fix unit test.
small-turtle-1 Dec 25, 2024
ad2b530
Load hnsw from mmap.
small-turtle-1 Dec 26, 2024
95cbf8f
fix.
small-turtle-1 Dec 26, 2024
075c722
mmap
small-turtle-1 Dec 26, 2024
3dfccea
Not unmmap util cleanup.
small-turtle-1 Dec 27, 2024
6117842
Merge remote-tracking branch 'upstream/main' into mmap_hnsw
small-turtle-1 Dec 27, 2024
c9316e2
Fix.
small-turtle-1 Dec 27, 2024
abe72da
Fix.
small-turtle-1 Dec 27, 2024
02f98ad
Fix.
small-turtle-1 Dec 27, 2024
b74771c
Refactor file worker Load.
small-turtle-1 Jan 3, 2025
97d61e8
Add lock from ptr for bmp index.
small-turtle-1 Jan 3, 2025
f241e9c
Add optimize in bmp.
small-turtle-1 Jan 6, 2025
1398070
Add optimize test case.
small-turtle-1 Jan 6, 2025
65bf459
Merge remote-tracking branch 'upstream/main' into mmap2
small-turtle-1 Jan 6, 2025
c38e6eb
Merge remote-tracking branch 'origin/mmap2' into mmap4
small-turtle-1 Jan 6, 2025
9bd9bdf
Add optimize in mmap hnsw
small-turtle-1 Jan 6, 2025
a3f1e02
Fix unit test.
small-turtle-1 Jan 6, 2025
8ea1e2a
Merge remote-tracking branch 'upstream/main' into mmap4
small-turtle-1 Jan 6, 2025
8a7e6f8
Fix status.
small-turtle-1 Jan 6, 2025
087405b
Fix benchmark check.
small-turtle-1 Jan 6, 2025
2a03faf
Merge branch 'main' into mmap4
small-turtle-1 Jan 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions python/test_pysdk/test_knn.py
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,7 @@ def test_sparse_knn_with_index(self, check_data, suffix):
index.IndexType.BMP,
{"block_size": "8", "compress_type": "compress"}), ConflictType.Error)

# table_obj.optimize("idx1", {"topk": "3"})
table_obj.optimize("idx1", {"topk": "3"})

res, extra_result = (table_obj
.output(["*", "_row_id", "_similarity"])
Expand Down Expand Up @@ -1592,7 +1592,7 @@ def test_sparse_knn_with_invalid_alpha_beta(self, check_data, alpha, beta, suffi
index.IndexType.BMP,
{"block_size": "8", "compress_type": "compress"}), ConflictType.Error)

# table_obj.optimize("idx1", {"topk": "3"})
table_obj.optimize("idx1", {"topk": "3"})

with pytest.raises(InfinityException) as e:
res, extra_result = (table_obj
Expand Down Expand Up @@ -1910,7 +1910,7 @@ def test_match_sparse_index_hint(self, check_data, suffix):
index.IndexType.BMP,
{"block_size": "8", "compress_type": "raww"}), ConflictType.Error)

# table_obj.optimize("idx1", {"topk": "3"})
table_obj.optimize("idx1", {"topk": "3"})

res, extra_result = (table_obj
.output(["*", "_row_id", "_similarity"])
Expand Down
16 changes: 16 additions & 0 deletions src/parser/type/serialize.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ inline std::string ReadBuf<std::string>(const char *buf) {
return str;
}

template <>
inline std::tuple<> ReadBuf<std::tuple<>>(const char *buf) {
return {};
}

template <>
inline std::tuple<> ReadBufAdv<std::tuple<>>(const char *&buf) {
return {};
}

template <>
inline std::string ReadBufAdv<std::string>(const char *&buf) {
int32_t size = ReadBufAdv<int32_t>(buf);
Expand Down Expand Up @@ -101,6 +111,12 @@ inline void WriteBufAdv<std::string>(char *&buf, const std::string &value) {
buf += len;
}

template <>
inline void WriteBuf<std::tuple<>>(char *const buf, const std::tuple<> &) {}

template <>
inline void WriteBufAdv<std::tuple<>>(char *&buf, const std::tuple<> &) {}

template <typename T>
inline void WriteBufVecAdv(char *&buf, const T *data, size_t size) {
static_assert(std::is_standard_layout_v<T>, "T must be POD");
Expand Down
4 changes: 2 additions & 2 deletions src/storage/buffer/buffer_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ BufferHandle::~BufferHandle() {
const void *BufferHandle::GetData() const { return data_; }

void *BufferHandle::GetDataMut() {
buffer_obj_->GetMutPointer();
data_ = buffer_obj_->GetMutPointer();
return data_;
}

const FileWorker *BufferHandle::GetFileWorker() const { return buffer_obj_->file_worker(); }

FileWorker *BufferHandle::GetFileWorkerMut() {
buffer_obj_->GetMutPointer();
data_ = buffer_obj_->GetMutPointer();
return buffer_obj_->file_worker();
}

Expand Down
13 changes: 11 additions & 2 deletions src/storage/buffer/buffer_obj.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ BufferHandle BufferObj::Load() {
std::unique_lock<std::mutex> locker(w_locker_);
if (type_ == BufferType::kMmap) {
switch (status_) {
case BufferStatus::kUnloaded:
case BufferStatus::kLoaded: {
break;
}
Expand Down Expand Up @@ -315,12 +316,20 @@ void BufferObj::LoadInner() {
++rc_;
}

void BufferObj::GetMutPointer() {
void *BufferObj::GetMutPointer() {
std::unique_lock<std::mutex> locker(w_locker_);
if (type_ == BufferType::kTemp) {
buffer_mgr_->RemoveTemp(this);
} else if (type_ == BufferType::kMmap) {
bool free_success = buffer_mgr_->RequestSpace(GetBufferSize());
if (!free_success) {
String error_message = "Out of memory.";
UnrecoverableError(error_message);
}
file_worker_->ReadFromFile(false);
}
type_ = BufferType::kEphemeral;
return file_worker_->GetData();
}

void BufferObj::UnloadInner() {
Expand All @@ -338,7 +347,7 @@ void BufferObj::UnloadInner() {
type_ = BufferType::kMmap;
} else if (type_ == BufferType::kMmap) {
file_worker_->MmapNotNeed();
status_ = BufferStatus::kFreed;
status_ = BufferStatus::kUnloaded;
} else {
buffer_mgr_->PushGCQueue(this);
status_ = BufferStatus::kUnloaded;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/buffer_obj.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private:
void LoadInner();

// called when BufferHandle needs mutable pointer.
void GetMutPointer();
void *GetMutPointer();

// called when BufferHandle destructs, to decrease rc_ by 1.
void UnloadInner();
Expand Down
8 changes: 6 additions & 2 deletions src/storage/buffer/file_worker/bmp_index_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ bool BMPIndexFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success, c
return true;
}

void BMPIndexFileWorker::ReadFromFileImpl(SizeT file_size) {
void BMPIndexFileWorker::ReadFromFileImpl(SizeT file_size, bool from_spill) {
if (data_ != nullptr) {
UnrecoverableError("Data is already allocated.");
}
Expand All @@ -135,7 +135,11 @@ void BMPIndexFileWorker::ReadFromFileImpl(SizeT file_size) {
} else {
using IndexT = std::decay_t<decltype(*index)>;
if constexpr (IndexT::kOwnMem) {
index = new IndexT(IndexT::Load(*file_handle_));
if (from_spill) {
index = new IndexT(IndexT::Load(*file_handle_));
} else {
index = new IndexT(IndexT::LoadFromPtr(*file_handle_, file_size));
}
} else {
UnrecoverableError("BMPIndexFileWorker::ReadFromFileImpl: index does not own memory");
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/bmp_index_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public:
protected:
bool WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx) override;

void ReadFromFileImpl(SizeT file_size) override;
void ReadFromFileImpl(SizeT file_size, bool from_spill) override;

bool ReadFromMmapImpl(const void *ptr, SizeT size) override;

Expand Down
6 changes: 3 additions & 3 deletions src/storage/buffer/file_worker/data_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ bool DataFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success, const
return true;
}

void DataFileWorker::ReadFromFileImpl(SizeT file_size) {
void DataFileWorker::ReadFromFileImpl(SizeT file_size, bool from_spill) {

if (file_size < sizeof(u64) * 3) {
Status status = Status::DataIOError(fmt::format("Incorrect file length {}.", file_size));
Expand All @@ -116,7 +116,7 @@ void DataFileWorker::ReadFromFileImpl(SizeT file_size) {
RecoverableError(status);
}
if (magic_number != 0x00dd3344) {
Status status = Status::DataIOError(fmt::format("Read magic number which length isn't {}.", nbytes1));
Status status = Status::DataIOError(fmt::format("Read magic error, {} != 0x00dd3344.", magic_number));
RecoverableError(status);
}

Expand Down Expand Up @@ -153,7 +153,7 @@ bool DataFileWorker::ReadFromMmapImpl(const void *p, SizeT file_size) {
const char *ptr = static_cast<const char *>(p);
u64 magic_number = ReadBufAdv<u64>(ptr);
if (magic_number != 0x00dd3344) {
Status status = Status::DataIOError(fmt::format("Read magic number which length isn't {}.", magic_number));
Status status = Status::DataIOError(fmt::format("Read magic error: {} != 0x00dd3344.", magic_number));
RecoverableError(status);
}
u64 buffer_size = ReadBufAdv<u64>(ptr);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/data_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public:
protected:
bool WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx) override;

void ReadFromFileImpl(SizeT file_size) override;
void ReadFromFileImpl(SizeT file_size, bool from_spill) override;

bool ReadFromMmapImpl(const void *ptr, SizeT size) override;

Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/emvb_index_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ bool EMVBIndexFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success,
return true;
}

void EMVBIndexFileWorker::ReadFromFileImpl(SizeT file_size) {
void EMVBIndexFileWorker::ReadFromFileImpl(SizeT file_size, bool from_spill) {
if (data_) {
const auto error_message = "Data is already allocated.";
UnrecoverableError(error_message);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/emvb_index_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public:
protected:
bool WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx) override;

void ReadFromFileImpl(SizeT file_size) override;
void ReadFromFileImpl(SizeT file_size, bool from_spill) override;

private:
const EmbeddingInfo *GetEmbeddingInfo() const;
Expand Down
6 changes: 3 additions & 3 deletions src/storage/buffer/file_worker/file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ void FileWorker::ReadFromFile(bool from_spill) {
}
file_handle_ = std::move(file_handle);
DeferFn defer_fn2([&]() { file_handle_ = nullptr; });
ReadFromFileImpl(file_size);
ReadFromFileImpl(file_size, from_spill);
}

void FileWorker::MoveFile() {
Expand Down Expand Up @@ -210,8 +210,8 @@ void FileWorker::CleanupTempFile() const {
}

void FileWorker::Mmap() {
if (mmap_addr_ != nullptr) {
return;
if (mmap_addr_ != nullptr || mmap_data_ != nullptr) {
this->Munmap();
}
auto [defer_fn, read_path] = GetFilePathInner(false);
bool use_object_cache = persistence_manager_ != nullptr;
Expand Down
3 changes: 1 addition & 2 deletions src/storage/buffer/file_worker/file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,8 @@ public:
protected:
virtual bool WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx = {}) = 0;

virtual void ReadFromFileImpl(SizeT file_size) = 0;
virtual void ReadFromFileImpl(SizeT file_size, bool from_spill) = 0;

private:
String ChooseFileDir(bool spill) const;

Pair<Optional<DeferFn<std::function<void()>>>, String> GetFilePathInner(bool spill);
Expand Down
61 changes: 58 additions & 3 deletions src/storage/buffer/file_worker/hnsw_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,20 @@ bool HnswFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success, const
if constexpr (std::is_same_v<T, std::nullptr_t>) {
UnrecoverableError("Invalid index type.");
} else {
index->Save(*file_handle_);
using IndexT = std::decay_t<decltype(*index)>;
if constexpr (IndexT::kOwnMem) {
index->SaveToPtr(*file_handle_);
} else {
UnrecoverableError("Invalid index type.");
}
}
},
*hnsw_index);
prepare_success = true;
return true;
}

void HnswFileWorker::ReadFromFileImpl(SizeT file_size) {
void HnswFileWorker::ReadFromFileImpl(SizeT file_size, bool from_spill) {
if (data_ != nullptr) {
UnrecoverableError("Data is already allocated.");
}
Expand All @@ -134,10 +139,60 @@ void HnswFileWorker::ReadFromFileImpl(SizeT file_size) {
UnrecoverableError("Invalid index type.");
} else {
using IndexT = std::decay_t<decltype(*index)>;
index = IndexT::Load(*file_handle_).release();
if constexpr (IndexT::kOwnMem) {
if (from_spill) {
index = IndexT::Load(*file_handle_).release();
} else {
index = IndexT::LoadFromPtr(*file_handle_, file_size).release();
}
} else {
UnrecoverableError("Invalid index type.");
}
}
},
*hnsw_index);
}

bool HnswFileWorker::ReadFromMmapImpl(const void *ptr, SizeT size) {
if (mmap_data_ != nullptr) {
UnrecoverableError("Mmap data is already allocated.");
}
mmap_data_ = reinterpret_cast<u8 *>(new AbstractHnsw(HnswIndexInMem::InitAbstractIndex(index_base_.get(), column_def_.get(), false)));
auto *hnsw_index = reinterpret_cast<AbstractHnsw *>(mmap_data_);
std::visit(
[&](auto &&index) {
using T = std::decay_t<decltype(index)>;
if constexpr (std::is_same_v<T, std::nullptr_t>) {
UnrecoverableError("Invalid index type.");
} else {
using IndexT = std::decay_t<decltype(*index)>;
if constexpr (!IndexT::kOwnMem) {
const auto *p = static_cast<const char *>(ptr);
index = IndexT::LoadFromPtr(p, size).release();
} else {
UnrecoverableError("Invalid index type.");
}
}
},
*hnsw_index);
return true;
}

void HnswFileWorker::FreeFromMmapImpl() {
if (mmap_data_ == nullptr) {
UnrecoverableError("Mmap data is not allocated.");
}
auto *hnsw_index = reinterpret_cast<AbstractHnsw *>(mmap_data_);
std::visit(
[&](auto &&index) {
using T = std::decay_t<decltype(index)>;
if constexpr (!std::is_same_v<T, std::nullptr_t>) {
delete index;
}
},
*hnsw_index);
delete hnsw_index;
mmap_data_ = nullptr;
}

} // namespace infinity
6 changes: 5 additions & 1 deletion src/storage/buffer/file_worker/hnsw_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ public:
protected:
bool WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx) override;

void ReadFromFileImpl(SizeT file_size) override;
void ReadFromFileImpl(SizeT file_size, bool from_spill) override;

bool ReadFromMmapImpl(const void *ptr, SizeT size) override;

void FreeFromMmapImpl() override;

private:
SizeT index_size_{};
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/ivf_index_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ bool IVFIndexFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success, c
return true;
}

void IVFIndexFileWorker::ReadFromFileImpl(SizeT file_size) {
void IVFIndexFileWorker::ReadFromFileImpl(SizeT file_size, bool from_spill) {
if (!data_) [[likely]] {
auto index = IVFIndexInChunk::GetNewIVFIndexInChunk(index_base_.get(), column_def_.get());
index->ReadIndexInner(*file_handle_);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/ivf_index_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public:
protected:
bool WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx) override;

void ReadFromFileImpl(SizeT file_size) override;
void ReadFromFileImpl(SizeT file_size, bool from_spill) override;
};

} // namespace infinity
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/raw_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ bool RawFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success, const
return true;
}

void RawFileWorker::ReadFromFileImpl(SizeT file_size) {
void RawFileWorker::ReadFromFileImpl(SizeT file_size, bool from_spill) {
buffer_size_ = file_handle_->FileSize();
data_ = static_cast<void *>(new char[buffer_size_]);
auto [nbytes, status1] = file_handle_->Read(data_, buffer_size_);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/raw_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public:
protected:
bool WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx) override;

void ReadFromFileImpl(SizeT file_size) override;
void ReadFromFileImpl(SizeT file_size, bool from_spill) override;

private:
SizeT buffer_size_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ bool SecondaryIndexFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_succ
return true;
}

void SecondaryIndexFileWorker::ReadFromFileImpl(SizeT file_size) {
void SecondaryIndexFileWorker::ReadFromFileImpl(SizeT file_size, bool from_spill) {
if (!data_) [[likely]] {
auto index = GetSecondaryIndexData(column_def_->type(), row_count_, false);
index->ReadIndexInner(*file_handle_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public:
protected:
bool WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx) override;

void ReadFromFileImpl(SizeT file_size) override;
void ReadFromFileImpl(SizeT file_size, bool from_spill) override;

const u32 row_count_{};
};
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/var_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ bool VarFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success, const
return true;
}

void VarFileWorker::ReadFromFileImpl(SizeT file_size) {
void VarFileWorker::ReadFromFileImpl(SizeT file_size, bool from_spill) {
if (data_ != nullptr) {
String error_message = "Data is not allocated.";
UnrecoverableError(error_message);
Expand Down
Loading
Loading