diff --git a/lib/pmem/ARTree.cpp b/lib/pmem/ARTree.cpp index 234d062..148a4b2 100644 --- a/lib/pmem/ARTree.cpp +++ b/lib/pmem/ARTree.cpp @@ -251,7 +251,6 @@ uint64_t ARTree::GetLeafCount() { void ARTree::Put(const char *key, // copy value from std::string char *value) { - // printKey(key); persistent_ptr valPrstPtr = tree->findValueInNode(tree->treeRoot->rootNode, key, false); if (valPrstPtr == nullptr) @@ -272,27 +271,50 @@ void ARTree::Put(const char *key, int32_t keyBytes, const char *value, * TODO: decrement value std::atomic refCounter in parent and free it if * counter==0 When freeing - call decrementParent recurenty. * */ -void ARTree::decrementParent(persistent_ptr node) {} +void ARTree::decrementParent(persistent_ptr node, const char *key) { + size_t keyCalc = key[tree->treeRoot->keySize - node->depth - 1]; + std::bitset<8> x(keyCalc); + node->children[keyCalc] = nullptr; + node->refCounter--; + if (node->refCounter == 0) { + if (node->depth == 0) { + return; + } + + decrementParent(node->parent, key); + if (node->parent && node->parent->refCounter == 0) { + persistent_ptr nodeParent = node->parent; + pmemobj_free(nodeParent->children[0].raw_ptr()); + } + } +} -void ARTree::removeFromParent(persistent_ptr valPrstPtr) { - // @TODO: cleaning parent node not implemented - decrementParent(valPrstPtr->parent); +void ARTree::removeFromTree(persistent_ptr compressed, + const char *key) { + persistent_ptr current = compressed->parent; + decrementParent(current, key); + if (current->refCounter == 0) { + pmemobj_free(current->children[0].raw_ptr()); + } } void ARTree::Remove(const char *key) { persistent_ptr valPrstPtr = tree->findValueInNode(tree->treeRoot->rootNode, key, false); - if (valPrstPtr == nullptr || (valPrstPtr->location == EMPTY)) throw OperationFailedException(Status(KEY_NOT_FOUND)); try { - removeFromParent(valPrstPtr); + removeFromTree(valPrstPtr->parent, key); if (_isLocationReservedNotPublished(valPrstPtr)) { valPrstPtr->location = EMPTY; pmemobj_cancel(tree->_pm_pool.get_handle(), valPrstPtr->actionValue, 1); delete valPrstPtr->actionValue; + // TODO: transaction should be used + // removal of Node Compressed not needed - it is one bigger node + // Instead decrement counter in parent and free child when 0 + // remove ValueWrapper pmemobj_free(valPrstPtr.raw_ptr()); } else if (valPrstPtr->location == DISK) { // TODO: jradtke need to confirm if no extra action required here @@ -371,7 +393,6 @@ void TreeImpl::allocateFullLevels(persistent_ptr node, _pm_pool.get_handle(), &(actionsArray[actionsCounter]), NODE_SIZE[node->type] * sizeof(NodeLeafCompressed), VALUE); #endif - if (nodeLeafCompressed_new == nullptr) { DAQ_CRITICAL("reserve nodeLeafCompressed failed actionsCounter=" + std::to_string(actionsCounter) + " with " + @@ -438,8 +459,7 @@ TreeImpl::findValueInNode(persistent_ptr current, const char *_key, if (valPrstPtr == nullptr) { DAQ_CRITICAL("reserve ValueWrapper failed with " + std::string(strerror(errno))); - throw OperationFailedException( - Status(PMEM_ALLOCATION_ERROR)); + throw OperationFailedException(Status(PMEM_ALLOCATION_ERROR)); } // std::cout << "valueWrapper off=" // << (*nodeLeafCompressed->child.raw_ptr()).off @@ -454,14 +474,13 @@ TreeImpl::findValueInNode(persistent_ptr current, const char *_key, * parallel on the same key. This is still not thread-safe. */ valPrstPtr->location = EMPTY; + valPrstPtr->parent = nodeLeafCompressed; nodeLeafCompressed->child = valPrstPtr; } DAQ_DEBUG("findValueInNode: Found"); return nodeLeafCompressed->child; } keyCalc = key[treeRoot->keySize - current->depth - 1]; - std::bitset<8> x(keyCalc); - DAQ_DEBUG("findValueInNode: keyCalc=" + x.to_string()); if (current->type == TYPE256) { // TYPE256 node256 = current; if (!allocate && node256->children[keyCalc]) { @@ -496,14 +515,15 @@ TreeImpl::findValueInNode(persistent_ptr current, const char *_key, return nullptr; } -/*void ARTree::printKey(const char *key) { +void ARTree::printKey(const char *key) { std::mutex localMutex; std::lock_guard lock(localMutex); std::cout << "printing key" << std::endl; - for (int i = 0; i < KEY_SIZE; i++) { - std::cout << std::bitset<8>(key[KEY_SIZE - i - 1]) << std::endl; + for (int i = 0; i < tree->treeRoot->keySize; i++) { + std::cout << std::bitset<8>(key[tree->treeRoot->keySize - i - 1]) + << std::endl; } -}*/ +} /* * Allocate value in ARTree for given key. diff --git a/lib/pmem/ARTree.h b/lib/pmem/ARTree.h index 56f39bf..1b6763c 100644 --- a/lib/pmem/ARTree.h +++ b/lib/pmem/ARTree.h @@ -81,7 +81,7 @@ struct locationWrapper { int value; }; class Node; - +class NodeLeafCompressed; struct ValueWrapper { explicit ValueWrapper() : actionValue(nullptr), actionUpdate(nullptr), location(EMPTY) {} @@ -95,7 +95,8 @@ struct ValueWrapper { v locationVolatile; struct pobj_action *actionValue; struct pobj_action *actionUpdate; - persistent_ptr parent; // pointer to parent, needed for removal + persistent_ptr + parent; // pointer to parent, needed for removal }; class Node { @@ -176,8 +177,9 @@ class ARTree : public DaqDB::RTreeEngine { void AllocateIOVForKey(const char *key, uint64_t **ptr, size_t size) final; void UpdateValueWrapper(const char *key, uint64_t *ptr, size_t size) final; void printKey(const char *key); - void decrementParent(persistent_ptr node); - void removeFromParent(persistent_ptr valPrstPtr); + void decrementParent(persistent_ptr node, const char *key); + void removeFromTree(persistent_ptr current, + const char *key); private: inline bool