diff --git a/lib/pmem/ARTree.cpp b/lib/pmem/ARTree.cpp index cc8eb0f..78a3a98 100644 --- a/lib/pmem/ARTree.cpp +++ b/lib/pmem/ARTree.cpp @@ -273,6 +273,9 @@ void TreeImpl::allocateFullLevels(persistent_ptr node, actionsCounter++; node256_new->depth = depth; node256_new->type = TYPE256; + pmemobj_flush(_pm_pool.get_handle(), + pmemobj_direct(*node256_new.raw_ptr()), + sizeof(Node256)); children[i] = node256_new; } else if (LEVEL_TYPE[depth] == TYPE_LEAF_COMPRESSED) { nodeLeafCompressed_new = pmemobj_xreserve( @@ -290,6 +293,12 @@ void TreeImpl::allocateFullLevels(persistent_ptr node, nodeLeafCompressed_new->type = TYPE_LEAF_COMPRESSED; // Temporarily disable the node nodeLeafCompressed_new->key = -1; + pmemobj_flush(_pm_pool.get_handle(), + &nodeLeafCompressed_new->depth, sizeof(int)); + pmemobj_flush(_pm_pool.get_handle(), + &nodeLeafCompressed_new->type, sizeof(int)); + pmemobj_flush(_pm_pool.get_handle(), + &nodeLeafCompressed_new->key, sizeof(uint32_t)); children[i] = nodeLeafCompressed_new; } @@ -307,7 +316,17 @@ void TreeImpl::allocateFullLevels(persistent_ptr node, throw OperationFailedException(Status(ALLOCATION_ERROR)); } for (int i = 0; i < NODE_SIZE[node->type]; i++) { - node256->children[i] = children[i]; + // node256->children[i] = children[i]; + pmemobj_set_value(_pm_pool.get_handle(), + &(actionsArray[actionsCounter]), + &((node256->children[i]).raw_ptr()->pool_uuid_lo), + children[i].raw_ptr()->pool_uuid_lo); + actionsCounter++; + pmemobj_set_value(_pm_pool.get_handle(), + &(actionsArray[actionsCounter]), + &((node256->children[i]).raw_ptr()->off), + children[i].raw_ptr()->off); + actionsCounter++; } } } @@ -363,8 +382,12 @@ ValueWrapper *TreeImpl::findValueInNode(persistent_ptr current, * the same key. This is still not thread-safe. */ val->location = EMPTY; + pmemobj_flush(_pm_pool.get_handle(), &(val->location), + sizeof(val->location)); // Enable the node nodeLeafCompressed->key = keyCalc; + pmemobj_flush(_pm_pool.get_handle(), &(nodeLeafCompressed->key), + sizeof(nodeLeafCompressed->key)); return val; } else { DAQ_DEBUG("findValueInNode: not allocate, keyCalc=" + @@ -399,6 +422,8 @@ ValueWrapper *TreeImpl::findValueInNode(persistent_ptr current, int actionsCounter = 0; allocateFullLevels(node256, 1, actionsArray, actionsCounter); + // drain is not needed if we are publishing in the same + // thread pmemobj_drain(_pm_pool.get_handle()); int status = pmemobj_publish(_pm_pool.get_handle(), actionsArray, actionsCounter); actionsCounter = 0; diff --git a/lib/pmem/ARTree.h b/lib/pmem/ARTree.h index 250b8df..5192c2e 100644 --- a/lib/pmem/ARTree.h +++ b/lib/pmem/ARTree.h @@ -54,7 +54,9 @@ const int LEVEL_TYPE[] = {TYPE256, TYPE256, TYPE256, // how many levels will be created on ARTree initialization const int PREALLOC_LEVELS = 1; // size of table for actions for each Node -#define ACTION_NUMBER_NODE256 (1 + 256) +#define CHILDREN_SETTING_ACTIONS 2 * 256 +#define CHILDREN_NUMBER 256 +#define ACTION_NUMBER_NODE256 (1 + CHILDREN_NUMBER + CHILDREN_SETTING_ACTIONS) #define ACTION_NUMBER_COMPRESSED 1 #define KEY_SIZE 12 @@ -100,6 +102,7 @@ class Node { int depth; // Type of Node: Node256 or compressed int type; + std::atomic counter; }; /*