Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] warmup report disk is full or not #2934

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions curvefs/src/client/curve_fuse_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

#include "curvefs/src/client/curve_fuse_op.h"

#include <fmt/format.h>

#include <cstring>
#include <memory>
#include <string>
Expand Down Expand Up @@ -285,8 +287,9 @@ void QueryWarmupTask(fuse_ino_t key, std::string *data) {
if (!ret) {
*data = "finished";
} else {
*data = std::to_string(progress.GetFinished()) + "/" +
std::to_string(progress.GetTotal());
*data =
fmt::format("{}/{}/{}", progress.GetFinished(), progress.GetTotal(),
progress.GetWarmupStorageErr());
}
VLOG(9) << "Warmup [" << key << "]" << *data;
}
Expand Down
2 changes: 1 addition & 1 deletion curvefs/src/client/s3/disk_cache_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ int DiskCacheManagerImpl::WriteReadDirect(const std::string fileName,
if (!diskCacheManager_->IsDiskUsedInited() ||
diskCacheManager_->IsDiskCacheFull()) {
VLOG(6) << "write disk file fail, disk full.";
return -1;
return 0;
}
int ret = diskCacheManager_->WriteReadDirect(fileName, buf, length);
if (ret < 0) {
Expand Down
11 changes: 11 additions & 0 deletions curvefs/src/client/s3/disk_cache_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,17 @@ class DiskCacheManagerImpl {
virtual int UmountDiskCache();

bool IsDiskCacheFull();
/**
* @brief
*
* @param fileName
* @param buf
* @param length
* @return int
* -2: disk full
* -1: write fail
* >=0: write success
*/
virtual int WriteReadDirect(const std::string fileName, const char* buf,
uint64_t length);
void InitMetrics(std::string fsName, std::shared_ptr<S3Metric> s3Metric);
Expand Down
2 changes: 1 addition & 1 deletion curvefs/src/client/s3/disk_cache_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ int DiskCacheRead::WriteDiskFile(const std::string fileName, const char *buf,
if (fd < 0) {
LOG(ERROR) << "open disk file error. errno = " << errno
<< ", file = " << fileName;
return fd;
return -1;
}
ssize_t writeLen = posixWrapper_->write(fd, buf, length);
if (writeLen < static_cast<ssize_t>(length)) {
Expand Down
12 changes: 11 additions & 1 deletion curvefs/src/client/s3/disk_cache_read.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,18 @@ class DiskCacheRead : public DiskCacheBase {
virtual ~DiskCacheRead() {}
virtual void Init(std::shared_ptr<PosixWrapper> posixWrapper,
const std::string cacheDir, uint32_t objectPrefix);
virtual int ReadDiskFile(const std::string name, char *buf, uint64_t offset,
virtual int ReadDiskFile(const std::string name, char* buf, uint64_t offset,
uint64_t length);
/**
* @brief
*
* @param fileName
* @param buf
* @param length
* @return int
-1: write fail
>=0: write success
*/
virtual int WriteDiskFile(const std::string fileName, const char *buf,
uint64_t length);
virtual int LinkWriteToRead(const std::string fileName,
Expand Down
21 changes: 19 additions & 2 deletions curvefs/src/client/warmup/warmup_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -777,9 +777,14 @@ void WarmupManagerS3Impl::PutObjectToCache(
case curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk:
ret = s3Adaptor_->GetDiskCacheManager()->WriteReadDirect(
context->key, context->buf, context->len);
if (ret < 0) {
if (ret == -1) {
iter->second.SetWarmupStorageErrorType(
WarmupStorageErrorType::WriteFail);
LOG_EVERY_SECOND(INFO)
<< "write read directly failed, key: " << context->key;
} else if (ret == -2) {
iter->second.SetWarmupStorageErrorType(
WarmupStorageErrorType::Full);
}
delete[] context->buf;
break;
Expand All @@ -788,7 +793,19 @@ void WarmupManagerS3Impl::PutObjectToCache(
if (kvClientManager_ != nullptr) {
kvClientManager_->Set(std::make_shared<SetKVCacheTask>(
context->key, context->buf, context->len,
[context](const std::shared_ptr<SetKVCacheTask>&) {
[context, this,
key](const std::shared_ptr<SetKVCacheTask>& task) {
{
ReadLockGuard lock(inode2ProgressMutex_);
auto iter = FindWarmupProgressByKeyLocked(key);
if (iter->second.GetStorageType() ==
curvefs::client::common::WarmupStorageType::
kWarmupStorageTypeKvClient &&
!task->res) {
iter->second.SetWarmupStorageErrorType(
WarmupStorageErrorType::WriteFail);
}
}
delete[] context->buf;
}));
}
Expand Down
41 changes: 37 additions & 4 deletions curvefs/src/client/warmup/warmup_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#ifndef CURVEFS_SRC_CLIENT_WARMUP_WARMUP_MANAGER_H_
#define CURVEFS_SRC_CLIENT_WARMUP_WARMUP_MANAGER_H_

#include <fmt/format.h>

#include <algorithm>
#include <atomic>
#include <cstdint>
Expand Down Expand Up @@ -64,6 +66,12 @@ using curve::common::BthreadRWLock;

using curvefs::client::common::WarmupStorageType;

enum WarmupStorageErrorType {
Ok = 0,
WriteFail = 1,
Full = 2,
};

class WarmupFile {
public:
explicit WarmupFile(fuse_ino_t key = 0, uint64_t fileLen = 0)
Expand Down Expand Up @@ -121,13 +129,15 @@ class WarmupProgress {
: total_(0),
finished_(0),
storageType_(type),
filePathInClient_(filePath) {}
filePathInClient_(filePath),
storageErr_(WarmupStorageErrorType::Ok) {}

WarmupProgress(const WarmupProgress& wp)
: total_(wp.total_),
finished_(wp.finished_),
storageType_(wp.storageType_),
filePathInClient_(wp.filePathInClient_) {}
filePathInClient_(wp.filePathInClient_),
storageErr_(wp.storageErr_) {}

void AddTotal(uint64_t add) {
std::lock_guard<std::mutex> lock(totalMutex_);
Expand All @@ -137,6 +147,7 @@ class WarmupProgress {
WarmupProgress& operator=(const WarmupProgress& wp) {
total_ = wp.total_;
finished_ = wp.finished_;
storageErr_ = wp.storageErr_;
return *this;
}

Expand All @@ -158,21 +169,43 @@ class WarmupProgress {
std::string ToString() {
std::lock_guard<std::mutex> lockT(totalMutex_);
std::lock_guard<std::mutex> lockF(finishedMutex_);
return "total:" + std::to_string(total_) +
",finished:" + std::to_string(finished_);
std::lock_guard<std::mutex> lockS(storageErrMutex_);
return fmt::format("total:{},finished:{},err:{}", total_, finished_,
storageErr_);
}

std::string GetFilePathInClient() { return filePathInClient_; }

WarmupStorageType GetStorageType() { return storageType_; }

void SetWarmupStorageErrorType(WarmupStorageErrorType err) {
std::lock_guard<std::mutex> lockS(storageErrMutex_);
storageErr_ = std::max(err, storageErr_);
}

std::string GetWarmupStorageErr() {
std::lock_guard<std::mutex> lockS(storageErrMutex_);
switch (storageErr_) {
case WarmupStorageErrorType::Ok:
return "Ok";
case WarmupStorageErrorType::WriteFail:
return "write fail";
case WarmupStorageErrorType::Full:
return "full";
default:
return "unkown";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unknown

}
}

private:
uint64_t total_;
std::mutex totalMutex_;
uint64_t finished_;
std::mutex finishedMutex_;
WarmupStorageType storageType_;
std::string filePathInClient_;
std::mutex storageErrMutex_;
WarmupStorageErrorType storageErr_;
};

using FuseOpReadFunctionType =
Expand Down
4 changes: 3 additions & 1 deletion tools-v2/pkg/cli/command/curvefs/warmup/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (qCmd *QueryCommand) RunCommand(cmd *cobra.Command, args []string) error {
break
}
strs := strings.Split(resultStr, "/")
if len(strs) != 2 {
if len(strs) < 3 {
break
}
finished, err := strconv.ParseUint(strs[0], 10, 64)
Expand All @@ -129,6 +129,8 @@ func (qCmd *QueryCommand) RunCommand(cmd *cobra.Command, args []string) error {
if err != nil {
break
}
status := strs[2]
bar.Describe(status)
bar.ChangeMax64(int64(total))
bar.Set64(int64(finished))
}
Expand Down