From 167a5dcd87656ef0b6f6526fcc58e04cc46e205b Mon Sep 17 00:00:00 2001 From: f00943869 Date: Mon, 3 Nov 2025 09:15:45 +0800 Subject: [PATCH] GPU Direct Storage(GDS) --- ucm/store/device/cuda/CMakeLists.txt | 7 + ucm/store/device/cuda/cuda_device.cu | 108 ++++++++++ ucm/store/device/idevice.h | 3 + ucm/store/device/simu/simu_device.cc | 8 + ucm/store/infra/file/file.cc | 14 ++ ucm/store/infra/file/file.h | 1 + ucm/store/infra/file/ifile.h | 1 + ucm/store/infra/file/posix_file.h | 1 + ucm/store/infra/template/handle_recorder.h | 83 ++++++++ ucm/store/infra/template/hashmap.h | 190 ++++++++++++++++++ ucm/store/nfsstore/cc/api/nfsstore.cc | 2 +- ucm/store/nfsstore/cc/api/nfsstore.h | 3 +- .../cc/domain/trans/directstorage_queue.cc | 118 +++++++++++ .../cc/domain/trans/directstorage_queue.h | 66 ++++++ .../nfsstore/cc/domain/trans/trans_manager.h | 39 +++- ucm/store/nfsstore/cpy/nfsstore.py.cc | 1 + ucm/store/nfsstore/nfsstore_connector.py | 4 + 17 files changed, 643 insertions(+), 6 deletions(-) create mode 100644 ucm/store/infra/template/handle_recorder.h create mode 100644 ucm/store/infra/template/hashmap.h create mode 100644 ucm/store/nfsstore/cc/domain/trans/directstorage_queue.cc create mode 100644 ucm/store/nfsstore/cc/domain/trans/directstorage_queue.h diff --git a/ucm/store/device/cuda/CMakeLists.txt b/ucm/store/device/cuda/CMakeLists.txt index fa0db292..fc81364b 100644 --- a/ucm/store/device/cuda/CMakeLists.txt +++ b/ucm/store/device/cuda/CMakeLists.txt @@ -8,3 +8,10 @@ target_compile_options(storedevice PRIVATE --diag-suppress=128 --diag-suppress=2417 --diag-suppress=2597 -Wall -fPIC ) +add_library(Cuda::cudart UNKNOWN IMPORTED) +set_target_properties(Cuda::cudart PROPERTIES + INTERFACE_INCLUDE_DIRECTORIES "${CUDA_ROOT}/include" + IMPORTED_LOCATION "${CUDA_ROOT}/lib64/libcudart.so" + IMPORTED_LOCATION "${CUDA_ROOT}/lib64/libcufile.so" +) +target_link_libraries(storedevice PUBLIC Cuda::cudart) \ No newline at end of file diff --git a/ucm/store/device/cuda/cuda_device.cu b/ucm/store/device/cuda/cuda_device.cu index 235b860c..4b9e5014 100644 --- a/ucm/store/device/cuda/cuda_device.cu +++ b/ucm/store/device/cuda/cuda_device.cu @@ -24,6 +24,15 @@ #include #include "ibuffered_device.h" #include "logger/logger.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include "infra/template/handle_recorder.h" #define CUDA_TRANS_UNIT_SIZE (sizeof(uint64_t) * 2) #define CUDA_TRANS_BLOCK_NUMBER (32) @@ -90,6 +99,25 @@ struct fmt::formatter : formatter { namespace UC { +static Status CreateCuFileHandle(int fd, CUfileHandle_t& cuFileHandle) +{ + if (fd < 0) { + UC_ERROR("Invalid file descriptor: {}", fd); + return Status::Error(); + } + + CUfileDescr_t cfDescr{}; + cfDescr.handle.fd = fd; + cfDescr.type = CU_FILE_HANDLE_TYPE_OPAQUE_FD; + CUfileError_t err = cuFileHandleRegister(&cuFileHandle, &cfDescr); + if (err.err != CU_FILE_SUCCESS) { + UC_ERROR("Failed to register cuFile handle for fd {}: error {}", + fd, static_cast(err.err)); + return Status::Error(); + } + + return Status::OK(); +} template Status CudaApi(const char* caller, const char* file, const size_t line, const char* name, Api&& api, Args&&... args) @@ -133,12 +161,23 @@ class CudaDevice : public IBufferedDevice { return nullptr; } static void ReleaseDeviceArray(void* deviceArray) { CUDA_API(cudaFree, deviceArray); } + static std::once_flag gdsOnce_; public: + static Status InitGdsOnce(); CudaDevice(const int32_t deviceId, const size_t bufferSize, const size_t bufferNumber) : IBufferedDevice{deviceId, bufferSize, bufferNumber}, stream_{nullptr} { } + ~CudaDevice() { + HandlePool::Instance().ClearAll([](CUfileHandle_t h) { + cuFileHandleDeregister(h); + }); + + if (stream_ != nullptr) { + cudaStreamDestroy((cudaStream_t)stream_); + } + } Status Setup() override { auto status = Status::OK(); @@ -165,6 +204,52 @@ public: { return CUDA_API(cudaMemcpyAsync, dst, src, count, cudaMemcpyDeviceToHost, this->stream_); } + Status S2DSync(int fd, void* address, const size_t length, const size_t fileOffset, const size_t devOffset) override + { + CUfileHandle_t cuFileHandle = nullptr; + auto status = HandlePool::Instance().Get(fd, cuFileHandle, + [fd](CUfileHandle_t& handle) -> Status { + return CreateCuFileHandle(fd, handle); + }); + if (status.Failure()) { + return status; + } + ssize_t bytesRead = cuFileRead(cuFileHandle, address, length, fileOffset, devOffset); + HandlePool::Instance().Put(fd, [](CUfileHandle_t h) { + if (h != nullptr) { + cuFileHandleDeregister(h); + } + }); + + if (bytesRead < 0 || (size_t)bytesRead != length) { + UC_ERROR("cuFileRead failed for fd {}: expected {}, got {}", fd, length, bytesRead); + return Status::Error(); + } + return Status::OK(); + } + Status D2SSync(int fd, void* address, const size_t length, const size_t fileOffset, const size_t devOffset) override + { + CUfileHandle_t cuFileHandle = nullptr; + auto status = HandlePool::Instance().Get(fd, cuFileHandle, + [fd](CUfileHandle_t& handle) -> Status { + return CreateCuFileHandle(fd, handle); + }); + if (status.Failure()) { + return status; + } + ssize_t bytesWrite = cuFileWrite(cuFileHandle, address, length, fileOffset, devOffset); + HandlePool::Instance().Put(fd, [](CUfileHandle_t h) { + if (h != nullptr) { + cuFileHandleDeregister(h); + } + }); + + if (bytesWrite < 0 || (size_t)bytesWrite != length) { + UC_ERROR("cuFileWrite failed for fd {}: expected {}, got {}", fd, length, bytesWrite); + return Status::Error(); + } + return Status::OK(); + } Status AppendCallback(std::function cb) override { auto* c = new (std::nothrow) Closure(cb); @@ -226,6 +311,14 @@ private: cudaStream_t stream_; }; +Status DeviceFactory::Setup(bool useDirect) +{ + if (useDirect) { + return CudaDevice::InitGdsOnce(); + } + return Status::OK(); +} + std::unique_ptr DeviceFactory::Make(const int32_t deviceId, const size_t bufferSize, const size_t bufferNumber) { @@ -237,5 +330,20 @@ std::unique_ptr DeviceFactory::Make(const int32_t deviceId, const size_ return nullptr; } } +std::once_flag CudaDevice::gdsOnce_{}; +Status CudaDevice::InitGdsOnce() +{ + Status result = Status::OK(); + std::call_once(gdsOnce_, [&result]() { + CUfileError_t ret = cuFileDriverOpen(); + if (ret.err == CU_FILE_SUCCESS) { + UC_INFO("GDS driver initialized successfully"); + } else { + UC_ERROR("GDS driver initialization failed with error code: {}", static_cast(ret.err)); + result = Status::Error(); + } + }); + return result; +} } // namespace UC diff --git a/ucm/store/device/idevice.h b/ucm/store/device/idevice.h index 8670df3b..cccb4802 100644 --- a/ucm/store/device/idevice.h +++ b/ucm/store/device/idevice.h @@ -49,6 +49,8 @@ class IDevice { const size_t count) = 0; virtual Status D2HBatchSync(std::byte* hArr[], const std::byte* dArr[], const size_t number, const size_t count) = 0; + virtual Status S2DSync(int fd, void* address, const size_t length, const size_t fileOffset, const size_t devOffset) = 0; + virtual Status D2SSync(int fd, void* address, const size_t length, const size_t fileOffset, const size_t devOffset) = 0; protected: virtual std::shared_ptr MakeBuffer(const size_t size) = 0; @@ -59,6 +61,7 @@ class IDevice { class DeviceFactory { public: + static Status Setup(bool useDirect = false); static std::unique_ptr Make(const int32_t deviceId, const size_t bufferSize, const size_t bufferNumber); }; diff --git a/ucm/store/device/simu/simu_device.cc b/ucm/store/device/simu/simu_device.cc index a26b7174..bfa09c30 100644 --- a/ucm/store/device/simu/simu_device.cc +++ b/ucm/store/device/simu/simu_device.cc @@ -72,6 +72,14 @@ class SimuDevice : public IBufferedDevice { this->backend_.Push([=] { std::copy(src, src + count, dst); }); return Status::OK(); } + Status S2DSync(int fd, void* address, const size_t length, const size_t fileOffset, const size_t devOffset) override + { + return Status::Unsupported(); + } + Status D2SSync(int fd, void* address, const size_t length, const size_t fileOffset, const size_t devOffset) override + { + return Status::Unsupported(); + } Status AppendCallback(std::function cb) override { this->backend_.Push([=] { cb(true); }); diff --git a/ucm/store/infra/file/file.cc b/ucm/store/infra/file/file.cc index 2dd9b75e..31f92e15 100644 --- a/ucm/store/infra/file/file.cc +++ b/ucm/store/infra/file/file.cc @@ -87,6 +87,20 @@ Status File::Write(const std::string& path, const size_t offset, const size_t le return status; } +Status File::OpenForDirectIO(const std::string& path, uint32_t flags, int& fd) +{ + auto file = std::make_unique(path); + auto status = file->Open(flags); + if (status.Failure()) { + UC_ERROR("Failed to open file({}) with flags({}).", path, flags); + fd = -1; + return status; + } + fd = file->GetHandle(); + file.release(); + return Status::OK(); +} + void File::MUnmap(void* addr, size_t size) { FileImpl{{}}.MUnmap(addr, size); } void File::ShmUnlink(const std::string& path) { FileImpl{path}.ShmUnlink(); } diff --git a/ucm/store/infra/file/file.h b/ucm/store/infra/file/file.h index 45d1d45b..c897956c 100644 --- a/ucm/store/infra/file/file.h +++ b/ucm/store/infra/file/file.h @@ -41,6 +41,7 @@ class File { uintptr_t address, const bool directIo = false); static Status Write(const std::string& path, const size_t offset, const size_t length, const uintptr_t address, const bool directIo = false); + static Status OpenForDirectIO(const std::string& path, uint32_t flags, int& fd); static void MUnmap(void* addr, size_t size); static void ShmUnlink(const std::string& path); static void Remove(const std::string& path); diff --git a/ucm/store/infra/file/ifile.h b/ucm/store/infra/file/ifile.h index 74b77cba..f34dbc67 100644 --- a/ucm/store/infra/file/ifile.h +++ b/ucm/store/infra/file/ifile.h @@ -55,6 +55,7 @@ class IFile { IFile(const std::string& path) : path_{path} {} virtual ~IFile() = default; const std::string& Path() const { return this->path_; } + virtual int32_t GetHandle() const = 0; virtual Status MkDir() = 0; virtual Status RmDir() = 0; virtual Status Rename(const std::string& newName) = 0; diff --git a/ucm/store/infra/file/posix_file.h b/ucm/store/infra/file/posix_file.h index becbd28a..e688f88f 100644 --- a/ucm/store/infra/file/posix_file.h +++ b/ucm/store/infra/file/posix_file.h @@ -32,6 +32,7 @@ class PosixFile : public IFile { public: PosixFile(const std::string& path) : IFile{path}, handle_{-1} {} ~PosixFile() override; + int32_t GetHandle() const override { return handle_; } Status MkDir() override; Status RmDir() override; Status Rename(const std::string& newName) override; diff --git a/ucm/store/infra/template/handle_recorder.h b/ucm/store/infra/template/handle_recorder.h new file mode 100644 index 00000000..55b41829 --- /dev/null +++ b/ucm/store/infra/template/handle_recorder.h @@ -0,0 +1,83 @@ +#ifndef UC_INFRA_HANDLE_POOL_H +#define UC_INFRA_HANDLE_POOL_H + +#include +#include "status/status.h" +#include "hashmap.h" + +namespace UC { + +template +class HandlePool { +private: + struct PoolEntry { + HandleType handle; + uint64_t refCount; + }; + using PoolMap = HashMap, 10>; + PoolMap pool_; + +public: + HandlePool() = default; + HandlePool(const HandlePool&) = delete; + HandlePool& operator=(const HandlePool&) = delete; + + static HandlePool& Instance() + { + static HandlePool instance; + return instance; + } + + Status Get(const KeyType& key, HandleType& handle, + std::function instantiate) + { + auto result = pool_.GetOrCreate(key, [&instantiate](PoolEntry& entry) -> bool { + HandleType h{}; + + auto status = instantiate(h); + if (status.Failure()) { + return false; + } + + entry.handle = h; + entry.refCount = 1; + return true; + }); + + if (!result.has_value()) { + return Status::Error(); + } + + auto& entry = result.value().get(); + entry.refCount++; + handle = entry.handle; + return Status::OK(); + } + + void Put(const KeyType& key, + std::function cleanup) + { + pool_.Upsert(key, [&cleanup](PoolEntry& entry) -> bool { + entry.refCount--; + if (entry.refCount > 0) { + return false; + } + cleanup(entry.handle); + return true; + }); + } + + void ClearAll(std::function cleanup) + { + pool_.ForEach([&cleanup](const KeyType& key, PoolEntry& entry) { + (void)key; + cleanup(entry.handle); + }); + pool_.Clear(); + } +}; + +} // namespace UC + +#endif + diff --git a/ucm/store/infra/template/hashmap.h b/ucm/store/infra/template/hashmap.h new file mode 100644 index 00000000..104ec658 --- /dev/null +++ b/ucm/store/infra/template/hashmap.h @@ -0,0 +1,190 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_HASHMAP_H +#define UNIFIEDCACHE_HASHMAP_H + +#include +#include +#include +#include +#include +#include + +namespace UC { + +template , size_t ShardBits = 10> +class HashMap { + static_assert(ShardBits <= 10, "ShardBits too large"); + static constexpr size_t Shards = size_t{1} << ShardBits; + + struct alignas(64) Shard { + mutable std::shared_mutex mtx; + std::vector, std::optional>> slots; + size_t used = 0; + }; + + std::array shards_; + Hash hash_; + std::atomic size_{0}; + + static size_t ShardIndex(size_t h) noexcept { + return h & (Shards - 1); + } + + static size_t ProbeIdx(size_t idx, size_t cap) noexcept { + return (idx + 1) & (cap - 1); + } + + static bool IsEmpty(const std::optional& slot) noexcept { + return !slot.has_value(); + } + + void RehashShard(Shard& s) { + std::vector, std::optional>> old = + std::move(s.slots); + size_t new_cap = (old.empty() ? 8 : old.size() * 2); + s.slots.assign(new_cap, {std::optional{}, std::optional{}}); + s.used = 0; + + for (const auto& slot : old) { + if (!slot.first.has_value()) { + continue; + } + + const Key& k = *slot.first; + const Value& v = *slot.second; + size_t h = hash_(k); + size_t idx = (h >> ShardBits) & (new_cap - 1); + + while (!IsEmpty(s.slots[idx].first)) { + idx = ProbeIdx(idx, new_cap); + } + + s.slots[idx].first.emplace(k); + s.slots[idx].second.emplace(v); + ++s.used; + } + } + +public: + HashMap() = default; + std::optional> GetOrCreate( + const Key& key, + std::function creator) + { + size_t h = hash_(key); + auto& shard = shards_[ShardIndex(h)]; + std::unique_lock lg(shard.mtx); + + if (shard.used * 4 >= shard.slots.size() * 3) [[unlikely]] { + RehashShard(shard); + } + + size_t cap = shard.slots.size(); + if (cap == 0) { + RehashShard(shard); + cap = shard.slots.size(); + } + + size_t idx = (h >> ShardBits) & (cap - 1); + size_t start = idx; + + do { + if (shard.slots[idx].first.has_value() && *shard.slots[idx].first == key) { + return std::ref(*shard.slots[idx].second); + } + if (IsEmpty(shard.slots[idx].first)) { + Value newValue; + if (!creator(newValue)) { + return std::optional>{}; + } + shard.slots[idx].first.emplace(key); + shard.slots[idx].second.emplace(std::move(newValue)); + ++shard.used; + ++size_; + return std::ref(*shard.slots[idx].second); + } + idx = ProbeIdx(idx, cap); + } while (idx != start); + RehashShard(shard); + return GetOrCreate(key, creator); + } + + void Upsert(const Key& key, std::function updater) { + size_t h = hash_(key); + auto& shard = shards_[ShardIndex(h)]; + std::unique_lock lg(shard.mtx); + + size_t cap = shard.slots.size(); + if (cap == 0) { + return; + } + + size_t idx = (h >> ShardBits) & (cap - 1); + size_t start = idx; + + do { + if (shard.slots[idx].first.has_value() && *shard.slots[idx].first == key) { + bool shouldDelete = updater(*shard.slots[idx].second); + if (shouldDelete) { + shard.slots[idx].first.reset(); + shard.slots[idx].second.reset(); + --shard.used; + --size_; + } + return; + } + + if (IsEmpty(shard.slots[idx].first)) { + return; + } + + idx = ProbeIdx(idx, cap); + } while (idx != start); + } + + void ForEach(std::function visitor) { + for (auto& shard : shards_) { + std::shared_lock lg(shard.mtx); + for (auto& slot : shard.slots) { + if (slot.first.has_value()) { + visitor(*slot.first, *slot.second); + } + } + } + } + + void Clear() { + for (auto& shard : shards_) { + std::unique_lock lg(shard.mtx); + shard.slots.clear(); + shard.used = 0; + } + size_.store(0); + } +}; + +} // namespace UC + +#endif // UNIFIEDCACHE_HASHMAP_H \ No newline at end of file diff --git a/ucm/store/nfsstore/cc/api/nfsstore.cc b/ucm/store/nfsstore/cc/api/nfsstore.cc index 381369c5..e1da650c 100644 --- a/ucm/store/nfsstore/cc/api/nfsstore.cc +++ b/ucm/store/nfsstore/cc/api/nfsstore.cc @@ -45,7 +45,7 @@ class NFSStoreImpl : public NFSStore { status = this->transMgr_.Setup(config.transferDeviceId, config.transferStreamNumber, config.transferIoSize, config.transferBufferNumber, - this->spaceMgr_.GetSpaceLayout(), config.transferTimeoutMs); + this->spaceMgr_.GetSpaceLayout(), config.transferTimeoutMs, config.useDirect); if (status.Failure()) { UC_ERROR("Failed({}) to setup TsfTaskManager.", status); return status.Underlying(); diff --git a/ucm/store/nfsstore/cc/api/nfsstore.h b/ucm/store/nfsstore/cc/api/nfsstore.h index 6ba14c63..6a76d8b1 100644 --- a/ucm/store/nfsstore/cc/api/nfsstore.h +++ b/ucm/store/nfsstore/cc/api/nfsstore.h @@ -46,6 +46,7 @@ class NFSStore : public CCStore { size_t storageCapacity; bool recycleEnable; float recycleThresholdRatio; + bool useDirect; Config(const std::vector& storageBackends, const size_t kvcacheBlockSize, const bool transferEnable) @@ -53,7 +54,7 @@ class NFSStore : public CCStore { transferEnable{transferEnable}, transferDeviceId{-1}, transferStreamNumber{32}, transferIoSize{262144}, transferBufferNumber{512}, transferTimeoutMs{30000}, tempDumpDirEnable{false}, hotnessEnable{true}, hotnessInterval{60}, - storageCapacity{0}, recycleEnable{true}, recycleThresholdRatio{0.7f} + storageCapacity{0}, recycleEnable{true}, recycleThresholdRatio{0.7f}, useDirect{false} { } }; diff --git a/ucm/store/nfsstore/cc/domain/trans/directstorage_queue.cc b/ucm/store/nfsstore/cc/domain/trans/directstorage_queue.cc new file mode 100644 index 00000000..9b70d0c8 --- /dev/null +++ b/ucm/store/nfsstore/cc/domain/trans/directstorage_queue.cc @@ -0,0 +1,118 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "directstorage_queue.h" +#include "logger/logger.h" +#include "infra/file/file.h" +#include +#include +#include +#include + +namespace UC { + +Status DirectStorageQueue::Setup(const int32_t deviceId, const size_t bufferSize, const size_t bufferNumber, + TaskSet* failureSet, const SpaceLayout* layout, const size_t timeoutMs, + HandlePool* handlePool) +{ + this->deviceId_ = deviceId; + this->bufferSize_ = bufferSize; + this->bufferNumber_ = bufferNumber; + this->failureSet_ = failureSet; + this->layout_ = layout; + this->handlePool_ = handlePool; + auto success = + this->backend_.SetWorkerInitFn([this](auto& device) { return this->Init(device); }) + .SetWorkerFn([this](auto& shard, const auto& device) { this->Work(shard, device); }) + .SetWorkerExitFn([this](auto& device) { this->Exit(device); }) + .Run(); + return success ? Status::OK() : Status::Error(); +} + +void DirectStorageQueue::Push(std::list& shards) noexcept { this->backend_.Push(shards); } + +bool DirectStorageQueue::Init(Device& device) +{ + if (this->deviceId_ < 0) { return true; } + device = DeviceFactory::Make(this->deviceId_, this->bufferSize_, this->bufferNumber_); + if (!device) { return false; } + return device->Setup().Success(); +} + +void DirectStorageQueue::Exit(Device& device) { device.reset(); } + +void DirectStorageQueue::Work(Task::Shard& shard, const Device& device) +{ + if (this->failureSet_->Contains(shard.owner)) { + this->Done(shard, device, true); + return; + } + auto status = Status::OK(); + if (shard.type == Task::Type::DUMP) { + status = this->D2S(shard, device); + } else { + status = this->S2D(shard, device); + } + this->Done(shard, device, status.Success()); +} + +void DirectStorageQueue::Done(Task::Shard& shard, const Device& device, const bool success) +{ + if (!success) { this->failureSet_->Insert(shard.owner); } + if (!shard.done) { return; } + shard.done(); +} + +Status DirectStorageQueue::S2D(Task::Shard& shard, const Device& device) +{ + auto path = this->layout_->DataFilePath(shard.block, false); + int fd = -1; + auto status = handlePool_->Get(path, fd, + [&path](int& newFd) -> Status { + return File::OpenForDirectIO(path, + IFile::OpenFlag::READ_ONLY | IFile::OpenFlag::DIRECT, + newFd); + }); + if (status.Failure()) { + return status; + } + return device->S2DSync(fd, (void*)shard.address, shard.length, shard.offset, 0); +} + +Status DirectStorageQueue::D2S(Task::Shard& shard, const Device& device) +{ + auto path = this->layout_->DataFilePath(shard.block, true); + int fd = -1; + auto status = handlePool_->Get(path, fd, + [&path](int& newFd) -> Status { + return File::OpenForDirectIO(path, + IFile::OpenFlag::WRITE_ONLY | IFile::OpenFlag::DIRECT, + newFd); + }); + if (status.Failure()) { + return status; + } + return device->D2SSync(fd, (void*)shard.address, shard.length, shard.offset, 0); +} + +} // namespace UC diff --git a/ucm/store/nfsstore/cc/domain/trans/directstorage_queue.h b/ucm/store/nfsstore/cc/domain/trans/directstorage_queue.h new file mode 100644 index 00000000..5f054dde --- /dev/null +++ b/ucm/store/nfsstore/cc/domain/trans/directstorage_queue.h @@ -0,0 +1,66 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_DIRECTSTORAGE_QUEUE_H +#define UNIFIEDCACHE_DIRECTSTORAGE_QUEUE_H + +#include "device/idevice.h" +#include "space/space_layout.h" +#include "status/status.h" +#include "task_queue.h" +#include "task_set.h" +#include "thread/thread_pool.h" +#include "infra/template/handle_recorder.h" +#include +#include + +namespace UC { + +class DirectStorageQueue : public TaskQueue { + using Device = std::unique_ptr; + int32_t deviceId_{-1}; + size_t bufferSize_{0}; + size_t bufferNumber_{0}; + TaskSet* failureSet_{nullptr}; + const SpaceLayout* layout_{nullptr}; + ThreadPool backend_{}; + HandlePool* handlePool_{nullptr}; + +public: + Status Setup(const int32_t deviceId, const size_t bufferSize, const size_t bufferNumber, + TaskSet* failureSet, const SpaceLayout* layout, const size_t timeoutMs, + HandlePool* handlePool = nullptr); + void Push(std::list& shards) noexcept override; + +private: + bool Init(Device& device); + void Exit(Device& device); + void Work(Task::Shard& shard, const Device& device); + void Done(Task::Shard& shard, const Device& device, const bool success); + Status D2S(Task::Shard& shard, const Device& device); + Status S2D(Task::Shard& shard, const Device& device); +}; + +} // namespace UC + +#endif diff --git a/ucm/store/nfsstore/cc/domain/trans/trans_manager.h b/ucm/store/nfsstore/cc/domain/trans/trans_manager.h index b78013c9..6882c267 100644 --- a/ucm/store/nfsstore/cc/domain/trans/trans_manager.h +++ b/ucm/store/nfsstore/cc/domain/trans/trans_manager.h @@ -26,25 +26,56 @@ #include "posix_queue.h" #include "task_manager.h" +#include "directstorage_queue.h" +#include "infra/template/handle_recorder.h" +#include namespace UC { class TransManager : public TaskManager { +private: + HandlePool handlePool_; + public: Status Setup(const int32_t deviceId, const size_t streamNumber, const size_t ioSize, - const size_t bufferNumber, const SpaceLayout* layout, const size_t timeoutMs) + const size_t bufferNumber, const SpaceLayout* layout, const size_t timeoutMs, bool useDirect) { this->timeoutMs_ = timeoutMs; auto status = Status::OK(); + + status = DeviceFactory::Setup(useDirect); + if (status.Failure()) { + UC_ERROR("Failed to setup device factory"); + return status; + } + for (size_t i = 0; i < streamNumber; i++) { - auto q = std::make_shared(); - status = - q->Setup(deviceId, ioSize, bufferNumber, &this->failureSet_, layout, timeoutMs); + std::shared_ptr q; + + if(useDirect) { + auto directQ = std::make_shared(); + status = directQ->Setup(deviceId, ioSize, bufferNumber, &this->failureSet_, layout, timeoutMs, &handlePool_); + q = directQ; + } + else { + auto posixQ = std::make_shared(); + status = posixQ->Setup(deviceId, ioSize, bufferNumber, &this->failureSet_, layout, timeoutMs); + q = posixQ; + } + if (status.Failure()) { break; } this->queues_.emplace_back(std::move(q)); } return status; } + + ~TransManager() { + handlePool_.ClearAll([](int fd) { + if (fd >= 0) { + close(fd); + } + }); + } }; } // namespace UC diff --git a/ucm/store/nfsstore/cpy/nfsstore.py.cc b/ucm/store/nfsstore/cpy/nfsstore.py.cc index 1536d876..f239eecd 100644 --- a/ucm/store/nfsstore/cpy/nfsstore.py.cc +++ b/ucm/store/nfsstore/cpy/nfsstore.py.cc @@ -128,6 +128,7 @@ PYBIND11_MODULE(ucmnfsstore, module) config.def_readwrite("storageCapacity", &UC::NFSStorePy::Config::storageCapacity); config.def_readwrite("recycleEnable", &UC::NFSStorePy::Config::recycleEnable); config.def_readwrite("recycleThresholdRatio", &UC::NFSStorePy::Config::recycleThresholdRatio); + config.def_readwrite("useDirect", &UC::NFSStorePy::Config::useDirect); store.def(py::init<>()); store.def("CCStoreImpl", &UC::NFSStorePy::CCStoreImpl); store.def("Setup", &UC::NFSStorePy::Setup); diff --git a/ucm/store/nfsstore/nfsstore_connector.py b/ucm/store/nfsstore/nfsstore_connector.py index 3752ee03..dc0a879d 100644 --- a/ucm/store/nfsstore/nfsstore_connector.py +++ b/ucm/store/nfsstore/nfsstore_connector.py @@ -45,12 +45,16 @@ def __init__(self, config: Dict): ] block_size = int(config["kv_block_size"]) transfer_enable = True if config["role"] == "worker" else False + useDirect = config.get("useDirect", False) + transferStreamNumber = config.get("transferStreamNumber", 32) param = ucmnfsstore.NFSStore.Config( storage_backends, block_size, transfer_enable ) if transfer_enable: param.transferDeviceId = config["device"] param.transferIoSize = config["io_size"] + param.useDirect = useDirect + param.transferStreamNumber = transferStreamNumber param.storageCapacity = config.get("storageCapacity", 0) param.recycleEnable = True if config.get("recycleEnable", 0) == 1 else False