Commit 84a51492 authored by Philipp Götze's avatar Philipp Götze
Browse files

Added recovery functionality to wHBPTree

parent 4898add0
......@@ -22,6 +22,7 @@
#include <array>
#include <bitset>
#include <cmath>
#include <iostream>
#include <libpmemobj++/make_persistent.hpp>
......@@ -379,6 +380,43 @@ class wHBPTree {
});
return deleted;
}
/**
* Recover the wBPTree by iterating over the LeafList and using the recoveryInsert method.
*/
void recover() {
LOG("Starting RECOVERY of wHBPTree");
persistent_ptr<LeafNode> currentLeaf = leafList;
if (leafList == nullptr) {
LOG("No data to recover wHBPTree");
return;
}
/* counting leafs */
auto leafs = 0u;
while(currentLeaf != nullptr) {
++leafs;
currentLeaf = currentLeaf->nextLeaf;
}
float x = std::log(leafs)/std::log(N+1);
assert(x == int(x) && "Not supported for this amount of leafs, yet");
/* actual recovery */
currentLeaf = leafList;
if (leafList->nextLeaf == nullptr) {
// The index has only one node, so the leaf node becomes the root node
rootNode = leafList;
depth = 0;
} else {
rootNode = newBranchNode();
depth = 1;
rootNode.branch->children[0] = currentLeaf;
currentLeaf = currentLeaf->nextLeaf;
while (currentLeaf != nullptr) {
recoveryInsert(currentLeaf);
currentLeaf = currentLeaf->nextLeaf;
}
}
LOG("RECOVERY Done")
}
/**
* Print the structure and content of the tree to stdout.
......@@ -1155,6 +1193,97 @@ class wHBPTree {
sibling->search.data.slot[0] += node->search.data.slot[0] + 1;
}
/**
* Insert a leaf node into the tree for recovery
* @param leaf the leaf to insert
*/
void recoveryInsert(persistent_ptr<LeafNode> leaf){
assert(depth > 0);
assert(leaf != nullptr);
SplitInfo splitInfo;
auto hassplit = recoveryInsertInBranchNode(rootNode.branch, depth, leaf, &splitInfo);
//Check for split
if (hassplit) {
//The root node was splitted
auto newRoot = newBranchNode();
newRoot->keys[0] = splitInfo.key;
newRoot->children[0] = splitInfo.leftChild;
newRoot->children[N] = splitInfo.rightChild;
newRoot->search.data.b.set(0);
newRoot->search.data.slot[0] = 1;
newRoot->search.data.slot[1] = 0;
rootNode = newRoot;
++depth;
}
}
/**
* Insert a leaf node into the tree recursively by following the path
* down to the leaf level starting at node @c node at depth @c depth.
*
* @param node the starting node for the insert
* @param depth the current depth of the tree (0 == leaf level)
* @param leaf the leaf node to insert
* @param splitInfo information about the split
* @return true if a split was performed
*/
bool recoveryInsertInBranchNode(BranchNode *node, int curr_depth, persistent_ptr<LeafNode> leaf, SplitInfo *splitInfo){
bool hassplit=false;
SplitInfo childSplitInfo;
auto &nSlotArray = node->search.data.slot;
const auto &lSlotArray = leaf->search.get_ro().data.slot;
if (curr_depth == 1) {
if (nSlotArray[0] == N) {
/* we have to insert a new right child, as the keys in the leafList are sorted */
BranchNode *newNode = newBranchNode();
newNode->search.data.slot[1] = 0;
newNode->children[0] = leaf;
splitInfo->key = leaf->keys.get_ro()[lSlotArray[1]];
splitInfo->leftChild = node;
splitInfo->rightChild = newNode;
return true;
} else {
node->search.data.b.set(nSlotArray[0]);
node->keys[nSlotArray[0]] = leaf->keys.get_ro()[lSlotArray[1]];
if (nSlotArray[0] > 0)
node->children[nSlotArray[0]] = node->children[N];
node->children[N] = leaf;
nSlotArray[nSlotArray[0] + 1] = nSlotArray[0];
++nSlotArray[0];
return false;
}
} else {
hassplit = recoveryInsertInBranchNode(node->children[nSlotArray[0]].branch, curr_depth- 1, leaf, &childSplitInfo);
}
//Check for split
if (hassplit) {
if (nSlotArray[0] == N) {
BranchNode *newNode = newBranchNode();
newNode->search.data.slot[1] = 0;
newNode->children[0] = childSplitInfo.rightChild;
splitInfo->key = childSplitInfo.key;
splitInfo->leftChild = node;
splitInfo->rightChild = newNode;
return true;
} else {
node->search.data.b.set(nSlotArray[0]);
node->keys[nSlotArray[0]] = childSplitInfo.key;
if (nSlotArray[0] > 0)
node->children[nSlotArray[0]] = node->children[N];
node->children[N] = childSplitInfo.rightChild;
nSlotArray[nSlotArray[0] + 1] = nSlotArray[0];
++nSlotArray[0];
return false;
}
}
return false;
}
/**
* Print the given leaf node @c node to standard output.
*
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment