Commit 50f654e6 authored by Philipp Götze's avatar Philipp Götze
Browse files

Added new Btree variant (Hybrid bitmap - BitHPBPTree) +++ adaption...

 Added new Btree variant (Hybrid bitmap - BitHPBPTree) +++ adaption of search functions +++ recovery test case
parent 2a53b0de
Pipeline #276 canceled with stages
in 11 minutes and 49 seconds
......@@ -6,7 +6,9 @@ project (nvm-DS)
################################################################################
# Set the mount path of your pmem device where the structures should be stored
set(PMEM_MNT_PATH "/mnt/pmem0/test")
if(NOT PMEM_MNT_PATH)
set(PMEM_MNT_PATH "/mnt/pmem0/test")
endif()
# Installation path (creates include and lib within this directory)
if(NOT PROJECT_INSTALL_DIR)
......
This diff is collapsed.
......@@ -454,30 +454,32 @@ class BitPBPTree {
if (pos > 0 && (prevNumKeys = leafRef.prevLeaf->bits.get_ro().count()) > middle) {
/// we have a sibling at the left for rebalancing the keys
balanceLeafNodes(leafRef.prevLeaf, leaf);
const auto newMin = leafRef.keys.get_ro()[findMinKeyAtPNode(leaf, leafRef.bits.get_ro())];
const auto prevPos = findMinKeyGreaterThanAtPNode(leaf, leafRef.bits.get_ro(), newMin);
const auto newMin = leafRef.keys.get_ro()[findMinKey(leafRef.keys.get_ro(),
leafRef.bits.get_ro())];
const auto prevPos = findMinKeyGreaterThan(leafRef.keys.get_ro(),
leafRef.bits.get_ro(), newMin);
nodeKeys[prevPos] = newMin;
} else if (pos < N && leafRef.nextLeaf->bits.get_ro().count() > middle) {
/// we have a sibling at the right for rebalancing the keys
balanceLeafNodes(leafRef.nextLeaf, leaf);
const auto &nextLeaf = *leafRef.nextLeaf;
nodeKeys[pos] =
nextLeaf.keys.get_ro()[findMinKeyAtPNode(leafRef.nextLeaf, nextLeaf.bits.get_ro())];
nextLeaf.keys.get_ro()[findMinKey(nextLeaf.keys.get_ro(), nextLeaf.bits.get_ro())];
} else {
/// 2. if this fails we have to merge two leaf nodes but only if both nodes have the same
/// direct parent
pptr<LeafNode> survivor = nullptr;
auto &nodeBits = nodeRef.bits.get_rw();
auto &nodeChilds = nodeRef.children.get_rw();
auto &nodeBits = nodeRef.bits.get_rw();
if (findMinKeyAtPNode(node, nodeBits) != pos && prevNumKeys <= middle) {
if (findMinKey(nodeKeys, nodeBits) != pos && prevNumKeys <= middle) {
/// merge left
survivor = mergeLeafNodes(leafRef.prevLeaf, leaf);
deleteLeafNode(leaf);
/// move to next left slot
const auto prevPos = (pos == N) ?
findMaxKeyAtPNode(node, nodeBits) : ///< we need a new rightmost node
findMaxKeySmallerThanAtPNode(node, nodeBits, nodeKeys[pos]);
findMaxKey(nodeKeys, nodeBits) : ///< we need a new rightmost node
findMaxKeySmallerThan(nodeKeys, nodeBits, nodeKeys[pos]);
nodeChilds[pos] = nodeChilds[prevPos];
nodeBits.reset(prevPos);
} else if (pos < N && leafRef.nextLeaf->bits.get_ro().count() <= middle) {
......@@ -485,7 +487,7 @@ class BitPBPTree {
survivor = mergeLeafNodes(leaf, leafRef.nextLeaf);
deleteLeafNode(leafRef.nextLeaf);
/// move to next right slot
const auto nextPos = findMinKeyGreaterThanAtPNode(node, nodeBits, nodeKeys[pos]);
const auto nextPos = findMinKeyGreaterThan(nodeKeys, nodeBits, nodeKeys[pos]);
nodeChilds[nextPos] = nodeChilds[pos];
nodeBits.reset(pos);
} else {
......@@ -564,7 +566,7 @@ class BitPBPTree {
if (donorKeys[0] < receiverKeys[0]) {
/// move from one node to a node with larger keys
for (auto i = 0u; i < toMove; ++i) {
const auto max = findMaxKeyAtPNode(donor, donorBits);
const auto max = findMaxKey(donorKeys, donorBits);
const auto u = BitOperations::getFreeZero(receiverBits);
/// move the donor's maximum key to the receiver
receiverKeys[u] = donorKeys[max];
......@@ -575,7 +577,7 @@ class BitPBPTree {
} else {
/// move from one node to a node with smaller keys
for (auto i = 0u; i < toMove; ++i) {
const auto min = findMinKeyAtPNode(donor, donorBits);
const auto min = findMinKey(donorKeys, donorBits);
const auto u = BitOperations::getFreeZero(receiverBits);
/// move the donor's minimum key to the receiver
receiverKeys[u] = donorKeys[min];
......@@ -652,10 +654,10 @@ class BitPBPTree {
const auto &nodeKeys = nodeRef.keys.get_ro();
const auto &nodeChilds = nodeRef.children.get_ro();
auto &nodeBits = nodeRef.bits.get_rw();
const auto nMinKeyPos = findMinKeyAtPNode(node, nodeBits);
const auto prevPos = findMaxKeySmallerThanAtPNode(node, nodeBits, nodeKeys[pos]); //could be N
const auto nMinKeyPos = findMinKey(nodeKeys, nodeBits);
const auto prevPos = findMaxKeySmallerThan(nodeKeys, nodeBits, nodeKeys[pos]); //could be N
const auto prevNumKeys = nodeChilds[prevPos].branch->bits.get_ro().count();
const auto nextPos = findMinKeyGreaterThanAtPNode(node, nodeBits, nodeKeys[pos]);
const auto nextPos = findMinKeyGreaterThan(nodeKeys, nodeBits, nodeKeys[pos]);
const auto nextNumKeys = nodeChilds[nextPos].branch->bits.get_ro().count();
constexpr auto middle = (N + 1) / 2;
......@@ -688,7 +690,7 @@ class BitPBPTree {
mergeBranchNodes(child, nodeKeys[pos], rSibling);
deleteBranchNode(rSibling);
nodeBits.reset(pos);
if (pos == findMaxKeyAtPNode(node, nodeBits))
if (pos == findMaxKey(nodeKeys, nodeBits))
nodeRef.children.get_rw()[N] = child; ///< new rightmost child
return child;
} else assert(false); ///< shouldn't happen
......@@ -716,8 +718,8 @@ class BitPBPTree {
auto &sibChilds = sibRef.children.get_rw();
auto &sibBits = sibRef.bits.get_rw();
assert(key <= nodeKeys[findMinKeyAtPNode(node, nodeBits)]);
assert(sibKeys[findMaxKeyAtPNode(sibling, sibBits)] < key);
assert(key <= nodeKeys[findMinKey(nodeKeys, nodeBits)]);
assert(sibKeys[findMaxKey(sibKeys, sibBits)] < key);
auto u = BitOperations::getFreeZero(sibBits);
sibKeys[u] = key;
......@@ -774,7 +776,7 @@ class BitPBPTree {
receiverBits.set(u);
/// 1.2. move toMove-1 keys/children from donor to receiver
for (auto i = 1u; i < toMove; ++i) {
const auto max = findMaxKeyAtPNode(donor, donorBits);
const auto max = findMaxKey(donorKeys, donorBits);
u = BitOperations::getFreeZero(receiverBits);
receiverKeys[u] = donorKeys[max];
receiverChilds[u] = donorChilds[max];
......@@ -782,7 +784,7 @@ class BitPBPTree {
donorBits.reset(max);
}
/// 1.3 set donors new rightmost child and new parent key
const auto dPos = findMaxKeyAtPNode(donor, donorBits);
const auto dPos = findMaxKey(donorKeys, donorBits);
donorChilds[N] = donorChilds[dPos];
parentKeys[pos] = donorKeys[dPos];
donorBits.reset(dPos);
......@@ -797,14 +799,14 @@ class BitPBPTree {
/// 2.2. move toMove-1 keys/children from donor to receiver
for (auto i = 1u; i < toMove; ++i) {
u = BitOperations::getFreeZero(receiverBits);
const auto min = findMinKeyAtPNode(donor, donorBits);
const auto min = findMinKey(donorKeys, donorBits);
receiverKeys[u] = donorKeys[min];
receiverChilds[u] = donorChilds[min];
receiverBits.set(u);
donorBits.reset(min);
}
/// 2.3. set receivers new rightmost child and new parent key
const auto dPos = findMinKeyAtPNode(donor, donorBits);
const auto dPos = findMinKey(donorKeys, donorBits);
receiverChilds[N] = donorChilds[dPos];
parentKeys[pos] = donorKeys[dPos];
donorBits.reset(dPos);
......@@ -847,7 +849,7 @@ class BitPBPTree {
if (key > splitRef.key) {
insertInLeafNodeAtPosition(sibling, BitOperations::getFreeZero(sibRef.bits.get_ro()), key, val);
} else {
if (key > findMaxKeyAtPNode(node, nodeRef.bits.get_ro())) {
if (key > findMaxKey(nodeRef.keys.get_ro(), nodeRef.bits.get_ro())) {
/// Special case: new key would be the middle, thus must be right
insertInLeafNodeAtPosition(sibling, BitOperations::getFreeZero(sibRef.bits.get_ro()), key, val);
splitRef.key = key;
......@@ -1020,12 +1022,13 @@ class BitPBPTree {
}
/// Insert new key and children
auto &hostRef = *host;
auto &hostKeys = hostRef.keys.get_rw();
auto &hostChilds = hostRef.children.get_rw();
auto &hostBits = hostRef.bits.get_rw();
const auto u = BitOperations::getFreeZero(hostBits);
const auto nextPos = findMinKeyGreaterThanAtPNode(host, hostBits, childSplitInfo.key);
const auto nextPos = findMinKeyGreaterThan(hostKeys, hostBits, childSplitInfo.key);
hostRef.keys.get_rw()[u] = childSplitInfo.key;
hostKeys[u] = childSplitInfo.key;
hostChilds[u] = childSplitInfo.leftChild;
hostChilds[nextPos] = childSplitInfo.rightChild;
hostBits.set(u);
......@@ -1068,7 +1071,8 @@ class BitPBPTree {
* @return the position of the key + 1 (or 0 or @c N)
*/
auto lookupPositionInBranchNode(const pptr<BranchNode> &node, const KeyType &key) const {
return findMinKeyGreaterThanAtPNode(node, node->bits.get_ro(), key);
const auto &nodeRef = *node;
return findMinKeyGreaterThan(nodeRef.keys.get_ro(), nodeRef.bits.get_ro(), key);
}
/**
......
......@@ -502,7 +502,7 @@ class FPTree {
if (key > splitRef.key) {
insertInLeafNodeAtPosition(sibling, BitOperations::getFreeZero(sibRef.search.get_ro().b), key, val);
} else {
if (key > findMaxKeyAtPNode(node, nodeRef.search.get_ro().b)) {
if (key > findMaxKey(nodeRef.keys.get_ro(), nodeRef.search.get_ro().b)) {
/// Special case: new key would be the middle, thus must be right
insertInLeafNodeAtPosition(sibling, BitOperations::getFreeZero(sibRef.search.get_ro().b), key, val);
splitRef.key = key;
......@@ -747,6 +747,9 @@ class FPTree {
auto lookupPositionInBranchNode(const BranchNode * const node, const KeyType &key) const {
const auto num = node->numKeys;
const auto &keys = node->keys;
//auto pos = 0u;
//for (; pos < num && keys[pos] <= key; ++pos);
//return pos;
return binarySearch<true>(keys, 0, num - 1, key);
}
......@@ -832,12 +835,14 @@ class FPTree {
/// we have a sibling at the left for rebalancing the keys
balanceLeafNodes(leafRef.prevLeaf, leaf);
nodeRef.keys[pos - 1] = leafRef.keys.get_ro()[findMinKeyAtPNode(leaf, leafRef.search.get_ro().b)];
nodeRef.keys[pos - 1] = leafRef.keys.get_ro()[findMinKey(leafRef.keys.get_ro(),
leafRef.search.get_ro().b)];
} else if (pos < nodeRef.numKeys && leafRef.nextLeaf->search.get_ro().b.count() > middle) {
/// we have a sibling at the right for rebalancing the keys
balanceLeafNodes(leafRef.nextLeaf, leaf);
auto &nextLeaf = *leafRef.nextLeaf;
nodeRef.keys[pos] = nextLeaf.keys.get_ro()[findMinKeyAtPNode(leafRef.nextLeaf, nextLeaf.search.get_ro().b)];
nodeRef.keys[pos] = nextLeaf.keys.get_ro()[findMinKey(nextLeaf.keys.get_ro(),
nextLeaf.search.get_ro().b)];
} else {
/// 2. if this fails we have to merge two leaf nodes but only if both nodes have the same
/// direct parent
......@@ -971,7 +976,7 @@ class FPTree {
if (donorKeys[0] < receiverKeys[0]) {
/// move to a node with larger keys
for (auto i = 0u; i < toMove; ++i) {
const auto max = findMaxKeyAtPNode(donor, donorSearch.b);
const auto max = findMaxKey(donorKeys, donorSearch.b);
const auto pos = BitOperations::getFreeZero(receiverSearch.b);
receiverSearch.b.set(pos);
receiverSearch.fp[pos] = fpHash(donorKeys[max]);
......@@ -982,7 +987,7 @@ class FPTree {
} else {
/// move to a node with smaller keys
for (auto i = 0u; i < toMove; ++i) {
const auto min = findMinKeyAtPNode(donor, donorSearch.b);
const auto min = findMinKey(donorKeys, donorSearch.b);
const auto pos = BitOperations::getFreeZero(receiverSearch.b);
receiverSearch.b.set(pos);
receiverSearch.fp[pos] = fpHash(donorKeys[min]);
......@@ -1171,12 +1176,14 @@ class FPTree {
/* we have to insert a new right child, as the keys in the leafList are sorted */
auto newNode = newBranchNode();
newNode->children[0] = leaf;
splitRef.key = leafRef.keys.get_ro()[findMinKeyAtPNode(leaf, leafRef.search.get_ro().b)];
splitRef.key = leafRef.keys.get_ro()[findMinKey(leafRef.keys.get_ro(),
leafRef.search.get_ro().b)];
splitRef.leftChild = node;
splitRef.rightChild = newNode;
return true;
} else {
nodeRef.keys[nodeRef.numKeys] = leafRef.keys.get_ro()[findMinKeyAtPNode(leaf, leafRef.search.get_ro().b)];
nodeRef.keys[nodeRef.numKeys] = leafRef.keys.get_ro()[findMinKey(leafRef.keys.get_ro(),
leafRef.search.get_ro().b)];
++nodeRef.numKeys;
nodeRef.children[nodeRef.numKeys] = leaf;
return false;
......
......@@ -1112,13 +1112,21 @@ class PBPTree {
pptr<LeafNode> newLeafNode() {
auto pop = pmem::obj::pool_by_vptr(this);
pptr<LeafNode> newNode = nullptr;
transaction::run(pop, [&] { newNode = make_persistent<LeafNode>(); });
if (pmemobj_tx_stage() == TX_STAGE_NONE) {
transaction::run(pop, [&] { newNode = make_persistent<LeafNode>(); });
} else {
newNode = make_persistent<LeafNode>();
}
return newNode;
}
void deleteLeafNode(pptr<LeafNode> &node) {
auto pop = pmem::obj::pool_by_vptr(this);
transaction::run(pop, [&] { delete_persistent<LeafNode>(node); });
if (pmemobj_tx_stage() == TX_STAGE_NONE) {
transaction::run(pop, [&] { delete_persistent<LeafNode>(node); });
} else {
delete_persistent<LeafNode>(node);
}
node = nullptr;
}
......@@ -1128,13 +1136,21 @@ class PBPTree {
pptr<BranchNode> newBranchNode() {
auto pop = pmem::obj::pool_by_vptr(this);
pptr<BranchNode> newNode = nullptr;
transaction::run(pop, [&] { newNode = make_persistent<BranchNode>(); });
if (pmemobj_tx_stage() == TX_STAGE_NONE) {
transaction::run(pop, [&] { newNode = make_persistent<BranchNode>(); });
} else {
newNode = make_persistent<BranchNode>();
}
return newNode;
}
void deleteBranchNode(pptr<BranchNode> &node) {
auto pop = pmem::obj::pool_by_vptr(this);
transaction::run(pop, [&] { delete_persistent<BranchNode>(node); });
if (pmemobj_tx_stage() == TX_STAGE_NONE) {
transaction::run(pop, [&] { delete_persistent<BranchNode>(node); });
} else {
delete_persistent<BranchNode>(node);
}
node = nullptr;
}
......
......@@ -404,11 +404,11 @@ class UnsortedPBPTree {
if (pos > 0 && (prevNumKeys = leafRef.prevLeaf->numKeys.get_ro()) > middle) {
/// we have a sibling at the left for rebalancing the keys
balanceLeafNodes(leafRef.prevLeaf, leaf);
nodeKeys[pos-1] = leafRef.keys.get_ro()[findMinKeyAtPNode(leaf)];
nodeKeys[pos-1] = leafRef.keys.get_ro()[findMinKey(leaf)];
} else if (pos < nNumKeys && (nextNumKeys = leafRef.nextLeaf->numKeys.get_ro() > middle)) {
/// we have a sibling at the right for rebalancing the keys
balanceLeafNodes(leafRef.nextLeaf, leaf);
nodeKeys[pos] = leafRef.nextLeaf->keys.get_ro()[findMinKeyAtPNode(leafRef.nextLeaf)];
nodeKeys[pos] = leafRef.nextLeaf->keys.get_ro()[findMinKey(leafRef.nextLeaf)];
} else {
/// 2. if this fails we have to merge two leaf nodes but only if both nodes have the same
// direct parent
......@@ -500,7 +500,7 @@ class UnsortedPBPTree {
/// move from one node to a node with larger keys
/// move toMove keys/values from donor to receiver
for (auto i = 0u; i < toMove; ++i) {
const auto max = findMaxKeyAtPNode(donor);
const auto max = findMaxKey(donor);
/// move the donor's maximum key to the receiver
receiverKeys[rNumKeys] = donorKeys[max];
receiverVals[rNumKeys] = donorVals[max];
......@@ -515,7 +515,7 @@ class UnsortedPBPTree {
/// mode from one node to a node with smaller keys
/// move toMove keys/values from donor to receiver
for (auto i = 0u; i < toMove; ++i) {
const auto min = findMinKeyAtPNode(donor);
const auto min = findMinKey(donor);
/// move the donor's minimum key to the receiver
receiverKeys[rNumKeys] = donorKeys[min];
receiverVals[rNumKeys] = donorVals[min];
......@@ -880,7 +880,7 @@ class UnsortedPBPTree {
if (key > splitRef.key) {
insertInLeafNodeAtLastPosition(sibling, key, val);
} else {
if (key > findMaxKeyAtPNode(node)) {
if (key > findMaxKey(node)) {
/// Special case: new key would be the middle, thus must be right
insertInLeafNodeAtLastPosition(sibling, key, val);
splitRef.key = key;
......@@ -890,7 +890,7 @@ class UnsortedPBPTree {
}
/* inform the caller about the split */
splitRef.key = sibling->keys.get_ro()[findMinKeyAtPNode(sibling)];
splitRef.key = sibling->keys.get_ro()[findMinKey(sibling)];
split = true;
} else {
/* otherwise, we can simply insert the new entry at the last position */
......@@ -1181,7 +1181,7 @@ class UnsortedPBPTree {
};
/**
* A structure for representing an branch node (branch node) of a B+ tree.
* A structure for representing a branch node (branch node) of a B+ tree.
*/
struct alignas(64) BranchNode {
/**
......
......@@ -416,7 +416,7 @@ class wBPTree {
const auto &key = keys[i];
if (key < minKey) continue;
if (key > maxKey) { higherThanMax = true; continue; }
auto &val = vals[i];
const auto &val = vals[i];
func(key, val);
}
/// move to the next leaf node
......@@ -482,65 +482,65 @@ class wBPTree {
* @param splitInfo[out] information about the split
*/
void splitLeafNode(const pptr<LeafNode> &node, SplitInfo *splitInfo) {
auto &nodeRef = *node;
/// determine the split position
constexpr auto middle = (M + 1) / 2;
auto &nodeRef = *node;
/// determine the split position
constexpr auto middle = (M + 1) / 2;
/// move all entries behind this position to a new sibling node
const auto sibling = newLeafNode();
auto &sibRef = *sibling;
auto &sibSearch = sibRef.search.get_rw();
auto &sibKeys = sibRef.keys.get_rw();
auto &sibVals = sibRef.values.get_rw();
auto &nodeSearch = nodeRef.search.get_rw();
const auto &nodeKeys = nodeRef.keys.get_ro();
const auto &nodeVals = nodeRef.values.get_ro();
sibSearch.slot[0] = nodeSearch.slot[0] - middle;
nodeSearch.slot[0] = middle;
for (auto i = 1u; i < sibSearch.slot[0] + 1; ++i) {
sibSearch.slot[i] = i - 1;
sibSearch.b.set(sibSearch.slot[i]);
sibKeys[sibSearch.slot[i]] = nodeKeys[nodeSearch.slot[i + middle]];
sibVals[sibSearch.slot[i]] = nodeVals[nodeSearch.slot[i + middle]];
}
for (auto i = middle; i < M; ++i)
nodeSearch.b.reset(nodeSearch.slot[i+1]);
PersistEmulation::writeBytes(((M-middle+ sibSearch.slot[0] +7)>>3) +
(sizeof(KeyType)+sizeof(ValueType)+1)*sibSearch.slot[0] + 2); //bits ceiled + entries/slots + numKeys
/// Alternative: copy node, inverse bitmap and shift slots
/*
const auto sibling = newLeafNode(node);
auto &sibRef = *sibling;
auto &sibSearch = sibRef.search.get_rw();
auto &nodeSearch = nodeRef.search.get_rw();
sibSearch.slot[0] = sibSearch.slot[0] - middle;
nodeSearch.slot[0] = middle;
for (auto i = middle; i < M; ++i)
nodeSearch.b.reset(sibSearch.slot[i+1]);
auto b = nodeSearch.b;
sibSearch.b = b.flip();
for (auto i = 1u; i < sibSearch.slot[0] + 1; ++i)
sibSearch.slot[i] = sibSearch.slot[sibSearch.slot[0]+i];
//std::copy(std::begin(sibSlots)+sibSearch.slot[0], std::end(sibSlots), std::begin(sibSlots)+1);
PersistEmulation::writeBytes(sizeof(LeafNode) +
((M-middle+M+7)>>3) + sibSearch.slot[0] + 2); //copy leaf + bits ceiled + slots + numKeys
*/
/// setup the list of leaf nodes
if (nodeRef.nextLeaf != nullptr) {
sibRef.nextLeaf = nodeRef.nextLeaf;
nodeRef.nextLeaf->prevLeaf = sibling;
PersistEmulation::writeBytes<16*2>();
}
nodeRef.nextLeaf = sibling;
sibRef.prevLeaf = node;
/// move all entries behind this position to a new sibling node
const auto sibling = newLeafNode();
auto &sibRef = *sibling;
auto &sibSearch = sibRef.search.get_rw();
auto &sibKeys = sibRef.keys.get_rw();
auto &sibVals = sibRef.values.get_rw();
auto &nodeSearch = nodeRef.search.get_rw();
const auto &nodeKeys = nodeRef.keys.get_ro();
const auto &nodeVals = nodeRef.values.get_ro();
sibSearch.slot[0] = nodeSearch.slot[0] - middle;
nodeSearch.slot[0] = middle;
for (auto i = 1u; i < sibSearch.slot[0] + 1; ++i) {
sibSearch.slot[i] = i - 1;
sibSearch.b.set(sibSearch.slot[i]);
sibKeys[sibSearch.slot[i]] = nodeKeys[nodeSearch.slot[i + middle]];
sibVals[sibSearch.slot[i]] = nodeVals[nodeSearch.slot[i + middle]];
}
for (auto i = middle; i < M; ++i)
nodeSearch.b.reset(nodeSearch.slot[i+1]);
PersistEmulation::writeBytes(((M-middle+ sibSearch.slot[0] +7)>>3) +
(sizeof(KeyType)+sizeof(ValueType)+1)*sibSearch.slot[0] + 2); //bits ceiled + entries/slots + numKeys
/// Alternative: copy node, inverse bitmap and shift slots
/*
const auto sibling = newLeafNode(node);
auto &sibRef = *sibling;
auto &sibSearch = sibRef.search.get_rw();
auto &nodeSearch = nodeRef.search.get_rw();
sibSearch.slot[0] = sibSearch.slot[0] - middle;
nodeSearch.slot[0] = middle;
for (auto i = middle; i < M; ++i)
nodeSearch.b.reset(sibSearch.slot[i+1]);
auto b = nodeSearch.b;
sibSearch.b = b.flip();
for (auto i = 1u; i < sibSearch.slot[0] + 1; ++i)
sibSearch.slot[i] = sibSearch.slot[sibSearch.slot[0]+i];
//std::copy(std::begin(sibSlots)+sibSearch.slot[0], std::end(sibSlots), std::begin(sibSlots)+1);
PersistEmulation::writeBytes(sizeof(LeafNode) +
((M-middle+M+7)>>3) + sibSearch.slot[0] + 2); //copy leaf + bits ceiled + slots + numKeys
*/
/// setup the list of leaf nodes
if (nodeRef.nextLeaf != nullptr) {
sibRef.nextLeaf = nodeRef.nextLeaf;
nodeRef.nextLeaf->prevLeaf = sibling;
PersistEmulation::writeBytes<16*2>();
}
nodeRef.nextLeaf = sibling;
sibRef.prevLeaf = node;
PersistEmulation::writeBytes<16*2>();
auto &splitRef = *splitInfo;
splitRef.leftChild = node;
splitRef.rightChild = sibling;
splitRef.key = sibKeys[sibSearch.slot[1]];
auto &splitRef = *splitInfo;
splitRef.leftChild = node;
splitRef.rightChild = sibling;
splitRef.key = sibKeys[sibSearch.slot[1]];
}
/**
......@@ -753,9 +753,10 @@ class wBPTree {
auto &nodeRef = *node;
auto &nodeSearch = nodeRef.search.get_rw();
if (nodeRef.keys.get_ro()[nodeSearch.slot[pos]] == key) {
nodeSearch.b.reset(pos);
for (auto i = pos; i < nodeSearch.slot[0] + 1; ++i)
nodeSearch.b.reset(nodeSearch.slot[pos]);
for (auto i = pos; i < nodeSearch.slot[0] + 1; ++i) {
nodeSearch.slot[i] = nodeSearch.slot[i + 1];
}
--nodeSearch.slot[0];
return true;
}
......@@ -921,7 +922,6 @@ class wBPTree {
/// 2. if this fails we have to merge two branch nodes
else {
auto newChild = child;
auto ppos = pos;
if (prevNumKeys > 0) {
auto &lSibling = nodeChilds[nodeSearch.slot[pos-1]].branch;
......@@ -941,8 +941,9 @@ class wBPTree {
/// remove key/children ppos from node
auto &nodeSearchW = nodeRef.search.get_rw();
for (auto i = ppos; i < nodeSearch.slot[0] - 1; ++i)
for (auto i = ppos; i < nodeSearch.slot[0] - 1; ++i) {
nodeSearchW.slot[i] = nodeSearch.slot[i + 1];
}
--nodeSearchW.slot[0];
return newChild;
}
......@@ -1114,11 +1115,11 @@ class wBPTree {
/// we move all keys/values from node2 to node1
for (auto i = 1u; i < node2Search.slot[0] + 1; ++i) {
const auto u = BitOperations::getFreeZero(node1Search.b);
node1Keys[u] = node2Keys[node2Search.slot[i]];
node1Vals[u] = node2Vals[node2Search.slot[i]];
node1Search.slot[node1Search.slot[0] + i] = u;
node1Search.b.set(u);
const auto u = BitOperations::getFreeZero(node1Search.b);
node1Keys[u] = node2Keys[node2Search.slot[i]];
node1Vals[u] = node2Vals[node2Search.slot[i]];
node1Search.slot[node1Search.slot[0] + i] = u;
node1Search.b.set(u);
}
node1Search.slot[0] += node2Search.slot[0];
node1Ref.nextLeaf = node2Ref.nextLeaf;
......
This diff is collapsed.
This diff is collapsed.
......@@ -346,7 +346,7 @@ TEST_CASE("Finding the leaf node containing a key", "[BitPBPTree]") {
btree.underflowAtBranchLevel(innerNodes[0], 0, innerNodes[1]);
REQUIRE(node1Ref.bits.get_ro().count() == 1);
REQUIRE((node1Ref.keys.get_ro())[dbis::findMinKeyAtPNode(innerNodes[0], node1Ref.bits.get_ro())] == 9);
REQUIRE((node1Ref.keys.get_ro())[dbis::findMinKey(node1Ref.keys.get_ro(), node1Ref.bits.get_ro())] == 9);
REQUIRE(node2Ref.bits.get_ro().count() == 2);
REQUIRE(node3Ref.bits.get_ro().count() == 2);
......
......@@ -12,6 +12,7 @@ target_link_libraries(MainTest INTERFACE Catch2::Catch)
do_test(PBPTreeTest)
do_test(UnsortedPBPTreeTest)
do_test(BitPBPTreeTest)
do_test(BitHPBPTreeTest)
# FPTree
do_test(HPBPTreeTest)
do_test(FPTreeTest)
......
......@@ -916,6 +916,69 @@ TEST_CASE("Finding the leaf node containing a key", "[FPTree]") {
REQUIRE(num == 50);
}
/* -------------------------------------------------------------------------------------------- */
SECTION("Constructing a LeafList and recover the branches") {
auto btree = rootRef.btree4;
transaction::run(pop, [&] {
if (btree) delete_persistent<FPTreeType4>(btree);
btree = make_persistent<FPTreeType4>();
auto &btreeRef = *btree;
auto prevLeaf = btreeRef.newLeafNode();
auto &pLeafRef = *prevLeaf;
for (auto k = 0u; k < 4; ++k) {
pLeafRef.keys.get_rw()[k] = k + 1;
pLeafRef.values.get_rw()[k] = k + 1;
pLeafRef.search.get_rw().fp[k] = btreeRef.fpHash(k+1);
pLeafRef.search.get_rw().b.set(k);
}
btreeRef.leafList = prevLeaf;
for (auto l = 1u; l < 25; ++l) {
auto newLeaf = btreeRef.newLeafNode();
prevLeaf->nextLeaf = newLeaf;
auto &leafRef = *newLeaf;