Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

curvefs(metaserver): Use DoublyBufferedData to replace read-write locks to manage copysets #2956

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ git_repository(
"//:thirdparties/brpc/brpc.patch",
"//:thirdparties/brpc/fix-gcc11.patch",
"//:thirdparties/brpc/0001-bvar-warning-on-conflict-bvar-name.patch",
"//:thirdparties/brpc/0002-Add-bthread-rwlock-try-rdlock.patch",
],
patch_args = ["-p1"],
)
Expand Down
6 changes: 6 additions & 0 deletions curvefs/src/metaserver/copyset/copyset_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ class CopysetNode : public braft::StateMachine {

virtual PeerId GetLeaderId() const;

GroupId GetGroupId() const;

MetaStore* GetMetaStore() const;

virtual uint64_t GetConfEpoch() const;
Expand Down Expand Up @@ -344,6 +346,10 @@ inline bool CopysetNode::IsLoading() const {
return isLoading_.load(std::memory_order_acquire);
}

inline GroupId CopysetNode::GetGroupId() const {
return groupId_;
}

} // namespace copyset
} // namespace metaserver
} // namespace curvefs
Expand Down
223 changes: 142 additions & 81 deletions curvefs/src/metaserver/copyset/copyset_node_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,58 +32,48 @@
#include "curvefs/src/metaserver/copyset/utils.h"
#include "src/common/timeutility.h"

#include "src/common/concurrent/generic_name_lock.h"

namespace curvefs {
namespace metaserver {
namespace copyset {

using ::curve::common::TimeUtility;
using NameLockGuard = curve::common::GenericNameLockGuard<Mutex>;

bool CopysetNodeManager::IsLoadFinished() const {
return loadFinished_.load(std::memory_order_acquire);
}

bool CopysetNodeManager::DeleteCopysetNodeInternal(PoolId poolId,
CopysetId copysetId,
bool removeData) {
GroupId groupId = ToGroupId(poolId, copysetId);

// stop copyset node first
{
ReadLockGuard lock(lock_);
auto it = copysets_.find(groupId);
if (it != copysets_.end()) {
it->second->Stop();
} else {
LOG(WARNING) << "Delete copyset failed, copyset "
<< ToGroupIdString(poolId, copysetId) << " not found";
return false;
}
bool CopysetNodeManager::DeleteCopysetNodeInternalLocked(PoolId poolId,
CopysetId copysetId,
bool removeData) {
auto node = GetSharedCopysetNode(poolId, copysetId);
if (node == nullptr) {
LOG(WARNING) << "Delete copyset failed, copyset: "
<< ToGroupIdString(poolId, copysetId) << " not found";
return false;
}

// remove copyset node
{
WriteLockGuard lock(lock_);
auto it = copysets_.find(groupId);
if (it != copysets_.end()) {
bool ret = true;
if (removeData) {
std::string copysetDataDir = it->second->GetCopysetDataDir();
if (!trash_.RecycleCopyset(copysetDataDir)) {
LOG(WARNING) << "Recycle copyset remote data failed, "
"copyset data path: '"
<< copysetDataDir << "'";
ret = false;
}
}

copysets_.erase(it);
LOG(INFO) << "Delete copyset " << ToGroupIdString(poolId, copysetId)
<< " success";
return ret;
node->Stop();
LOG(INFO) << "Copyset " << ToGroupIdString(poolId, copysetId) << " stopped";

bool ret = true;
if (removeData) {
std::string copysetDataDir = node->GetCopysetDataDir();
if (!trash_.RecycleCopyset(copysetDataDir)) {
LOG(WARNING) << "Recycle copyset remove data failed, "
"copyset data path: '"
<< copysetDataDir << "'";
ret = false;
}
}

return false;
// delete node
copysets_.Modify(RemoveCopysetNode, node);
LOG(INFO) << "Delete copyset " << ToGroupIdString(poolId, copysetId)
<< " success";
return ret;
}

bool CopysetNodeManager::Init(const CopysetNodeOptions& options) {
Expand All @@ -107,11 +97,11 @@ bool CopysetNodeManager::Start() {
loadFinished_.store(true, std::memory_order_release);
LOG(INFO) << "Reload copysets success";
return true;
} else {
running_.store(false, std::memory_order_release);
LOG(ERROR) << "Reload copysets failed";
return false;
}

running_.store(false, std::memory_order_release);
LOG(ERROR) << "Reload copysets failed";
return false;
}

bool CopysetNodeManager::Stop() {
Expand All @@ -123,16 +113,24 @@ bool CopysetNodeManager::Stop() {
loadFinished_.store(false);

{
ReadLockGuard lock(lock_);
for (auto& copyset : copysets_) {
butil::DoublyBufferedData<CopysetNodeMap>::ScopedPtr ptr;
if (copysets_.Read(&ptr) != 0) {
LOG(ERROR) << "Fail to get copyset nodes";
return false;
}

for (const auto& copyset : *ptr) {
copyset.second->Stop();
}
}

{
WriteLockGuard lock(lock_);
copysets_.clear();
}
auto clear = [](CopysetNodeMap& map) -> size_t {
map.clear();
return 1;
};

// clear copysets
copysets_.Modify(clear);

if (!trash_.Stop()) {
LOG(ERROR) << "Stop trash failed";
Expand All @@ -146,54 +144,105 @@ bool CopysetNodeManager::Stop() {

CopysetNode* CopysetNodeManager::GetCopysetNode(PoolId poolId,
CopysetId copysetId) {
ReadLockGuard lock(lock_);
butil::DoublyBufferedData<CopysetNodeMap>::ScopedPtr ptr;
if (copysets_.Read(&ptr) != 0) {
LOG(WARNING) << "Fail to get copyset: "
<< ToGroupIdString(poolId, copysetId);
return nullptr;
}

auto it = copysets_.find(ToGroupId(poolId, copysetId));
if (it != copysets_.end()) {
auto it = ptr->find(ToGroupId(poolId, copysetId));
if (it != ptr->end()) {
return it->second.get();
}

LOG(WARNING) << "Fail to get copyset: "
<< ToGroupIdString(poolId, copysetId);
return nullptr;
}

std::shared_ptr<CopysetNode> CopysetNodeManager::GetSharedCopysetNode(
PoolId poolId, CopysetId copysetId) {
ReadLockGuard lock(lock_);
butil::DoublyBufferedData<CopysetNodeMap>::ScopedPtr ptr;
if (copysets_.Read(&ptr) != 0) {
LOG(WARNING) << "Fail to get copyset: "
<< ToGroupIdString(poolId, copysetId);
return nullptr;
}

auto it = copysets_.find(ToGroupId(poolId, copysetId));
if (it != copysets_.end()) {
auto it = ptr->find(ToGroupId(poolId, copysetId));
if (it != ptr->end()) {
return it->second;
}

LOG(WARNING) << "Fail to get copyset: "
<< ToGroupIdString(poolId, copysetId);
return nullptr;
}

int CopysetNodeManager::IsCopysetNodeExist(
const CreateCopysetRequest::Copyset& copyset) {
ReadLockGuard lock(lock_);
auto iter = copysets_.find(ToGroupId(copyset.poolid(),
copyset.copysetid()));
if (iter == copysets_.end()) {
butil::DoublyBufferedData<CopysetNodeMap>::ScopedPtr ptr;
if (copysets_.Read(&ptr) != 0) {
LOG(WARNING) << "Fail to get copyset: "
<< ToGroupIdString(copyset.poolid(), copyset.copysetid());
return 0;
}

auto iter = ptr->find(ToGroupId(copyset.poolid(), copyset.copysetid()));
if (iter == ptr->end()) {
return 0;
} else {
auto copysetNode = iter->second.get();
std::vector<Peer> peers;
copysetNode->ListPeers(&peers);
if (peers.size() != static_cast<size_t>(copyset.peers_size())) {
}

auto* copysetNode = iter->second.get();
std::vector<Peer> peers;
copysetNode->ListPeers(&peers);
if (peers.size() != static_cast<size_t>(copyset.peers_size())) {
return -1;
}

for (int i = 0; i < copyset.peers_size(); i++) {
const auto& cspeer = copyset.peers(i);
auto iter =
std::find_if(peers.begin(), peers.end(), [&cspeer](const Peer& p) {
return cspeer.address() == p.address();
});
if (iter == peers.end()) {
return -1;
}
}

for (int i = 0; i < copyset.peers_size(); i++) {
const auto& cspeer = copyset.peers(i);
auto iter = std::find_if(peers.begin(), peers.end(),
[&cspeer](const Peer& p) {
return cspeer.address() == p.address();
});
if (iter == peers.end()) {
return -1;
}
}
return 1;
}

size_t CopysetNodeManager::AddCopysetNode(
CopysetNodeMap& map, const std::shared_ptr<CopysetNode>& copysetNode) {
assert(copysetNode != nullptr);

auto groupId = copysetNode->GetGroupId();
auto it = map.find(groupId);
if (it != map.end()) {
LOG(WARNING) << "Copyset node already exists: " << groupId;
return 0;
}

auto ret = map.emplace(groupId, copysetNode);
CHECK(ret.second);
return 1;
}

size_t CopysetNodeManager::RemoveCopysetNode(
CopysetNodeMap& map, const std::shared_ptr<CopysetNode>& copysetNode) {
assert(copysetNode != nullptr);

auto groupId = copysetNode->GetGroupId();
auto it = map.find(groupId);
if (it == map.end()) {
LOG(WARNING) << "Copyset node not found: " << groupId;
return 0;
}

map.erase(it);
return 1;
}

Expand All @@ -209,9 +258,11 @@ bool CopysetNodeManager::CreateCopysetNode(PoolId poolId, CopysetId copysetId,
braft::GroupId groupId = ToGroupId(poolId, copysetId);
std::shared_ptr<CopysetNode> copysetNode;

NameLockGuard guard(copysetLifetimeNameLock_, groupId);

{
WriteLockGuard lock(lock_);
if (copysets_.count(groupId) != 0) {
auto* exist = GetCopysetNode(poolId, copysetId);
if (exist != nullptr) {
LOG(WARNING) << "Copyset node already exists: "
<< ToGroupIdString(poolId, copysetId);
return false;
Expand All @@ -225,7 +276,7 @@ bool CopysetNodeManager::CreateCopysetNode(PoolId poolId, CopysetId copysetId,
return false;
}

copysets_.emplace(groupId, copysetNode);
copysets_.Modify(AddCopysetNode, copysetNode);
}

// node start maybe time-consuming
Expand All @@ -234,7 +285,7 @@ bool CopysetNodeManager::CreateCopysetNode(PoolId poolId, CopysetId copysetId,
// restart, in this case we should not automaticaly remove copyset's
// data
const bool removeData = checkLoadFinish;
DeleteCopysetNodeInternal(poolId, copysetId, removeData);
DeleteCopysetNodeInternalLocked(poolId, copysetId, removeData);
LOG(ERROR) << "Copyset " << ToGroupIdString(poolId, copysetId)
<< " start failed";
return false;
Expand All @@ -246,10 +297,16 @@ bool CopysetNodeManager::CreateCopysetNode(PoolId poolId, CopysetId copysetId,
}

void CopysetNodeManager::GetAllCopysets(
std::vector<CopysetNode *> *nodes) const {
std::vector<CopysetNode*>* nodes) const {
nodes->clear();
ReadLockGuard lock(lock_);
for (auto& copyset : copysets_) {
butil::DoublyBufferedData<CopysetNodeMap>::ScopedPtr ptr;
if (copysets_.Read(&ptr) != 0) {
LOG(WARNING) << "Fail to get all copysets";
return;
}

nodes->reserve(ptr->size());
for (const auto& copyset : *ptr) {
nodes->push_back(copyset.second.get());
}
}
Expand All @@ -266,11 +323,15 @@ void CopysetNodeManager::AddService(brpc::Server* server,
}

bool CopysetNodeManager::DeleteCopysetNode(PoolId poolId, CopysetId copysetId) {
return DeleteCopysetNodeInternal(poolId, copysetId, false);
GroupId groupId = ToGroupId(poolId, copysetId);
NameLockGuard guard(copysetLifetimeNameLock_, groupId);
return DeleteCopysetNodeInternalLocked(poolId, copysetId, false);
}

bool CopysetNodeManager::PurgeCopysetNode(PoolId poolId, CopysetId copysetId) {
return DeleteCopysetNodeInternal(poolId, copysetId, true);
GroupId groupId = ToGroupId(poolId, copysetId);
NameLockGuard guard(copysetLifetimeNameLock_, groupId);
return DeleteCopysetNodeInternalLocked(poolId, copysetId, true);
}

} // namespace copyset
Expand Down
Loading