UnsortedPBPTree.hpp 45.1 KB
Newer Older
1
/*
2
 * Copyright (C) 2017-2020 DBIS Group - TU Ilmenau, All Rights Reserved.
3
 *
4
 * This file is part of our NVM-based Data Structures repository.
5
 *
6 7 8
 * This program is free software: you can redistribute it and/or modify it under the terms of the
 * GNU General Public License as published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
9
 *
10 11 12
 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 * See the GNU General Public License for more details.
13
 *
14 15
 * You should have received a copy of the GNU General Public License along with this program.
 * If not, see <http://www.gnu.org/licenses/>.
16 17 18 19 20 21 22 23
 */

#ifndef DBIS_UnsortedPBPTree_hpp_
#define DBIS_UnsortedPBPTree_hpp_

#include <array>
#include <iostream>

24
#include <libpmemobj/ctl.h>
25 26 27 28 29
#include <libpmemobj++/make_persistent.hpp>
#include <libpmemobj++/p.hpp>
#include <libpmemobj++/persistent_ptr.hpp>
#include <libpmemobj++/transaction.hpp>
#include <libpmemobj++/utils.hpp>
30

31
#include "config.h"
32 33
#include "utils/PersistEmulation.hpp"
#include "utils/SearchFunctions.hpp"
34

35
namespace dbis::pbptrees {
36

37
using pmem::obj::allocation_flag;
38 39 40 41 42
using pmem::obj::delete_persistent;
using pmem::obj::make_persistent;
using pmem::obj::p;
using pmem::obj::persistent_ptr;
using pmem::obj::transaction;
43 44
template<typename Object>
using pptr = persistent_ptr<Object>;
45 46 47 48 49 50 51 52 53 54 55

/**
  * A persistent memory implementation of a B+ tree.
  *
  * @tparam KeyType the data type of the key
  * @tparam ValueType the data type of the values associated with the key
  * @tparam N the maximum number of keys on a branch node
  * @tparam M the maximum number of keys on a leaf node
  */
template<typename KeyType, typename ValueType, int N, int M>
class UnsortedPBPTree {
56
  /// we need at least two keys on a branch node to be able to split
57
  static_assert(N > 2, "number of branch keys has to be >2.");
58 59
  /// we need an even order for branch nodes to be able to merge
  static_assert(N % 2 == 0, "order of branch nodes must be even.");
60
  /// we need at least one key on a leaf node
61 62 63 64 65 66 67 68
  static_assert(M > 0, "number of leaf keys should be >0.");

#ifndef UNIT_TESTS
  private:
#else
  public:
#endif

69
  /// Forward declarations
70 71 72
  struct LeafNode;
  struct BranchNode;

73 74
  struct Node {
    Node() : tag(BLANK) {};
75

76
    Node(pptr<LeafNode> leaf_) : tag(LEAF), leaf(leaf_) {};
77

78
    Node(pptr<BranchNode> branch_) : tag(BRANCH), branch(branch_) {};
79

80
    Node(const Node &other) { copy(other); };
81

82
    void copy(const Node &other) throw() {
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
      tag = other.tag;

      switch (tag) {
        case LEAF: {
          leaf = other.leaf;
          break;
        }
        case BRANCH: {
          branch = other.branch;
          break;
        }
        default: break;
      }
    }

98
    Node &operator=(Node other) {
99 100 101 102 103 104 105 106
      copy(other);
      return *this;
    }

    enum NodeType {
      BLANK, LEAF, BRANCH
    } tag;
    union {
107 108
      pptr<LeafNode> leaf;
      pptr<BranchNode> branch;
109 110 111 112
    };
  };

  /**
113
   * A structure for passing information about a node split to the caller.
114 115
   */
  struct SplitInfo {
116 117 118
    KeyType key;     ///< the key at which the node was split
    Node leftChild;  ///< the resulting lhs child node
    Node rightChild; ///< the resulting rhs child node
119 120
  };

121 122
  static constexpr pobj_alloc_class_desc AllocClass{256, 64, 1, POBJ_HEADER_COMPACT};
  pobj_alloc_class_desc alloc_class;
123 124 125 126
  p<unsigned int> depth; /**< the depth of the tree, i.e. the number of levels
                              (0 => rootNode is LeafNode) */
  Node rootNode; /**< pointer to the root node (an instance of @c LeafNode or @c BranchNode).
                      This pointer is never @c nullptr. */
127

128
 public:
129 130 131 132 133

  /**
   * Iterator for iterating over the leaf nodes
   */
  class iterator {
134
    pptr<LeafNode> currentNode;
135 136 137 138 139
    std::size_t currentPosition;

    public:
    iterator() : currentNode(nullptr), currentPosition(0) {}

140
    iterator(const Node &root, std::size_t d) {
141
      /// traverse to left-most key
142
      auto node = root;
143
      while (d-- > 0) node = node.branch->children.get_ro()[0];
144 145 146 147 148
      currentNode = node.leaf;
      currentPosition = 0;
    }

    iterator &operator++() {
149 150 151
      const auto &nodeRef = *currentNode;
      if (currentPosition >= nodeRef.numKeys.get_ro() - 1) {
        currentNode = nodeRef.nextLeaf;
152 153
        currentPosition = 0;
      } else {
154
        ++currentPosition;
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
      }
      return *this;
    }

    iterator operator++(int) {
      iterator retval = *this;
      ++(*this);
      return retval;
    }

    bool operator==(iterator other) const {
      return (currentNode == other.currentNode &&
          currentPosition == other.currentPosition);
    }

    bool operator!=(iterator other) const { return !(*this == other); }

    std::pair<KeyType, ValueType> operator*() {
173 174 175
      const auto &nodeRef = *currentNode;
      return std::make_pair(nodeRef.keys.get_ro()[currentPosition],
                            nodeRef.values.get_ro()[currentPosition]);
176 177
    }

178
    /// iterator traits
179 180 181 182 183 184 185 186 187 188
    using difference_type = long;
    using value_type = std::pair<KeyType, ValueType>;
    using pointer = const std::pair<KeyType, ValueType> *;
    using reference = const std::pair<KeyType, ValueType> &;
    using iterator_category = std::forward_iterator_tag;
  };

  iterator begin() { return iterator(rootNode, depth); }

  iterator end() { return iterator(); }
Philipp Götze's avatar
Philipp Götze committed
189

190 191 192 193 194
  /**
   * Typedef for a function passed to the scan method.
   */
  using ScanFunc = std::function<void(const KeyType &key, const ValueType &val)>;

195 196
  /* -------------------------------------------------------------------------------------------- */

197 198 199
  /**
   * Constructor for creating a new B+ tree.
   */
200 201
  explicit UnsortedPBPTree(struct pobj_alloc_class_desc _alloc) : depth(0), alloc_class(_alloc) {
  // UnsortedPBPTree() : depth(0) {
202 203 204 205 206 207 208 209
    rootNode = newLeafNode();
    LOG("created new tree with sizeof(BranchNode) = " << sizeof(BranchNode)
      << ", sizeof(LeafNode) = " << sizeof(LeafNode));
  }

  /**
   * Destructor for the B+ tree. Should delete all allocated nodes.
   */
210
  ~UnsortedPBPTree() {}
211 212

  /**
213 214
   * Insert an element (a key-value pair) into the B+ tree. If the key @c key already exists, the
   * corresponding value is replaced by @c val.
215 216 217 218 219 220 221 222
   *
   * @param key the key of the element to be inserted
   * @param val the value that is associated with the key
   */
  void insert(const KeyType &key, const ValueType &val) {
        SplitInfo splitInfo;

        bool wasSplit = false;
223 224
        if (depth.get_ro() == 0) {
        /// the root node is a leaf node
225 226 227
        auto n = rootNode.leaf;
        wasSplit = insertInLeafNode(n, key, val, &splitInfo);
        } else {
228
        /// the root node is a branch node
229 230 231 232
        auto n = rootNode.branch;
        wasSplit = insertInBranchNode(n, depth, key, val, &splitInfo);
        }
        if (wasSplit) {
233
        /// we had an overflow in the node and therefore the node is split
234
        auto root = newBranchNode();
235
        auto &rootRef = *root;
236
        auto &rootChilds = rootRef.children.get_rw();
237
        rootRef.keys.get_rw()[0] = splitInfo.key;
238 239
        rootChilds[0] = splitInfo.leftChild;
        rootChilds[1] = splitInfo.rightChild;
240
        rootRef.numKeys.get_rw() = 1;
241
        rootNode.branch = root;
242
        ++depth.get_rw();
243 244 245 246
        }
  }

  /**
247
   * Find the given @c key in the B+ tree and if found return the corresponding value.
248 249
   *
   * @param key the key we are looking for
250
   * @param[out] val a pointer to memory where the value is stored if the key was found
251 252 253 254 255 256
   * @return true if the key was found, false otherwise
   */
  bool lookup(const KeyType &key, ValueType *val) const {
    assert(val != nullptr);
    bool result = false;

257 258 259 260 261 262
    const auto leaf = findLeafNode(key);
    auto &leafRef = *leaf;
    auto pos = lookupPositionInLeafNode(leaf, key);
    if (pos < leafRef.numKeys.get_ro() && leafRef.keys.get_ro()[pos] == key) {
      /// we found it!
      *val = leafRef.values.get_ro()[pos];
263 264 265 266 267 268 269 270 271 272 273 274 275
      result = true;
    }
    return result;
  }

  /**
   * Delete the entry with the given key @c key from the tree.
   *
   * @param key the key of the entry to be deleted
   * @return true if the key was found and deleted
   */
  bool erase(const KeyType &key) {
    bool result;
276 277
      if (depth.get_ro() == 0) {
        /// special case: the root node is a leaf node and there is no need to handle underflow
278 279
        auto node = rootNode.leaf;
        assert(node != nullptr);
280
        result = eraseFromLeafNode(node, key);
281
      } else {
282 283
        auto node = rootNode.branch;
        assert(node != nullptr);
284
        result = eraseFromBranchNode(node, depth, key);
285
      }
286 287
    return result;
  }
288 289 290

  void recover() {}

291 292 293 294
  /**
   * Print the structure and content of the B+ tree to stdout.
   */
  void print() const {
295 296
    if (depth.get_ro() == 0) printLeafNode(0, rootNode.leaf);
    else printBranchNode(0u, rootNode.branch);
297 298 299
  }

  /**
300 301
   * Perform a scan over all key-value pairs stored in the B+ tree. For each entry the given
   * function @func is called.
302 303 304 305
   *
   * @param func the function called for each entry
   */
  void scan(ScanFunc func) const {
306
    /// we traverse to the leftmost leaf node
307 308
    auto node = rootNode;
    auto d = depth.get_ro();
309
    while (d-- > 0) node = node.branch->children.get_ro()[0];
310 311
    auto leaf = node.leaf;
    while (leaf != nullptr) {
312 313
      const auto &leafRef = *leaf;
      /// for each key-value pair call func
314 315 316 317 318 319
      const auto &numKeys = leafRef.numKeys.get_ro();
      const auto &leafKeys = leafRef.keys.get_ro();
      const auto &leafVals = leafRef.values.get_ro();
      for (auto i = 0u; i < numKeys; ++i) {
        const auto &key = leafKeys[i];
        const auto &val = leafVals[i];
320 321
        func(key, val);
      }
322 323
      /// move to the next leaf node
      leaf = leafRef.nextLeaf;
324 325 326 327
    }
  }

  /**
328 329
   * Perform a range scan over all elements within the range [minKey, maxKey] and for each element
   * call the given function @c func.
330 331 332 333 334 335
   *
   * @param minKey the lower boundary of the range
   * @param maxKey the upper boundary of the range
   * @param func the function called for each entry
   */
  void scan(const KeyType &minKey, const KeyType &maxKey, ScanFunc func) const {
336
    const auto leaf = findLeafNode(minKey);
337

338 339
    bool higherThanMax = false;
    while (!higherThanMax && leaf != nullptr) {
340 341
      const auto &leafRef = *leaf;
      /// for each key-value pair within the range call func
342 343 344 345 346 347 348 349
      const auto &numKeys = leafRef.numKeys.get_ro();
      const auto &leafKeys = leafRef.keys.get_ro();
      const auto &leafVals = leafRef.values.get_ro();
      for (auto i = 0u; i < numKeys; ++i) {
        auto &key = leafKeys[i];
        if (key < minKey) continue;
        if (key > maxKey) { higherThanMax = true; continue; }
        auto &val = leafVals[i];
350 351
        func(key, val);
      }
352 353
      /// move to the next leaf node
      leaf = leafRef.nextLeaf;
354 355 356 357 358 359
    }
  }

#ifndef UNIT_TESTS
  private:
#endif
360 361 362
  /* -------------------------------------------------------------------------------------------- */
  /* DELETE AT LEAF LEVEL                                                                         */
  /* -------------------------------------------------------------------------------------------- */
363 364 365 366 367 368 369 370

  /**
   * Delete the element with the given key from the given leaf node.
   *
   * @param node the leaf node from which the element is deleted
   * @param key the key of the element to be deleted
   * @return true of the element was deleted
   */
371
  bool eraseFromLeafNode(const pptr<LeafNode> &node, const KeyType &key) {
372
    auto pos = lookupPositionInLeafNode(node, key);
373 374 375 376 377
    if (node->keys.get_ro()[pos] == key) {
      return eraseFromLeafNodeAtPosition(node, pos, key);
    } else {
      return false;
    }
378 379 380 381 382 383 384 385 386 387 388 389 390
  }

  /**
   * Delete the element with the given position and key from the given leaf node.
   *
   * @param node the leaf node from which the element is deleted
   * @param pos the position of the key in the node
   * @param key the key of the element to be deleted
   * @return true of the element was deleted
   */
  bool eraseFromLeafNodeAtPosition(const pptr<LeafNode> &node, const unsigned int pos,
                                   const KeyType &key) {
    auto &nodeRef = *node;
391
    // if (nodeRef.keys.get_ro()[pos] == key) {
392 393 394
      auto &numKeys = nodeRef.numKeys.get_rw();
      auto &nodeKeys = nodeRef.keys.get_rw();
      auto &nodeVals = nodeRef.values.get_rw();
395
      /// simply copy the last object on the node to the position of the erased object
396
      --numKeys;
397 398 399 400 401 402
      if (pos != numKeys) {
        nodeKeys[pos] = nodeKeys[numKeys];
        nodeVals[pos] = nodeVals[numKeys];
      }
      PersistEmulation::writeBytes(sizeof(size_t) +
                                   (pos != numKeys ? sizeof(KeyType) + sizeof(ValueType) : 0));
403
      return true;
404 405
    // }
    // return false;
406 407 408
  }

  /**
409 410
   * Handle the case that during a delete operation a underflow at node @c leaf occured. If possible
   * this is handled
411 412 413 414
   * (1) by rebalancing the elements among the leaf node and one of its siblings
   * (2) if not possible by merging with one of its siblings.
   *
   * @param node the parent node of the node where the underflow occured
415
   * @param pos the position of the child node @leaf in the @c children array of the branch node
416 417
   * @param leaf the node at which the underflow occured
   */
418 419 420 421
  void underflowAtLeafLevel(const pptr<BranchNode> &node, unsigned int pos,
                            const pptr<LeafNode> &leaf) {
    auto &nodeRef = *node;
    const auto &leafRef = *leaf;
422 423 424 425
    auto &nodeKeys =nodeRef.keys.get_rw();
    const auto nNumKeys = nodeRef.numKeys.get_ro();
    auto prevNumKeys = 0u, nextNumKeys = 0u;
    assert(pos <= nNumKeys);
426 427 428 429

    constexpr auto middle = (M + 1) / 2;
    /// 1. we check whether we can rebalance with one of the siblings but only if both nodes have
    //     the same direct parent
430
    if (pos > 0 && (prevNumKeys = leafRef.prevLeaf->numKeys.get_ro()) > middle) {
431 432
      /// we have a sibling at the left for rebalancing the keys
      balanceLeafNodes(leafRef.prevLeaf, leaf);
433
      nodeKeys[pos-1] = leafRef.keys.get_ro()[findMinKey(leaf)];
434
    } else if (pos < nNumKeys && (nextNumKeys = leafRef.nextLeaf->numKeys.get_ro() > middle)) {
435 436
      /// we have a sibling at the right for rebalancing the keys
      balanceLeafNodes(leafRef.nextLeaf, leaf);
437
      nodeKeys[pos] = leafRef.nextLeaf->keys.get_ro()[findMinKey(leafRef.nextLeaf)];
438 439 440 441
    } else {
      /// 2. if this fails we have to merge two leaf nodes but only if both nodes have the same
      //     direct parent
      pptr<LeafNode> survivor = nullptr;
442
      if (pos > 0 && prevNumKeys <= middle) {
443 444
        survivor = mergeLeafNodes(leafRef.prevLeaf, leaf);
        deleteLeafNode(leaf);
445
      } else if (pos < nNumKeys && nextNumKeys <= middle) {
446 447
        /// because we update the pointers in mergeLeafNodes we keep it here
        survivor = mergeLeafNodes(leaf, leafRef.nextLeaf);
448 449 450 451 452
        deleteLeafNode(leafRef.nextLeaf);
      } else  assert(false); ///< this shouldn't happen?!

      if (nNumKeys > 1) {
        if (pos > 0) --pos;
453
        /// just remove the child node from the current branch node
454 455 456 457
        auto &nodeChilds = nodeRef.children.get_rw();
        for (auto i = pos; i < nNumKeys - 1; ++i) {
          nodeKeys[i] = nodeKeys[i + 1];
          nodeChilds[i + 1] = nodeChilds[i + 2];
458
        }
459 460
        nodeChilds[pos] = survivor;
        --nodeRef.numKeys.get_rw();
461 462 463 464
      } else {
        /// This is a special case that happens only if the current node is the root node. Now, we
        /// have to replace the branch root node by a leaf node.
        rootNode = survivor;
465
        --depth.get_rw();
466 467
      }
    }
468
  }
469 470

  /**
471
   * Merge two leaf nodes by moving all elements from @c node2 to @c node1.
472 473 474 475 476
   *
   * @param node1 the target node of the merge
   * @param node2 the source node
   * @return the merged node (always @c node1)
   */
477
  pptr<LeafNode> mergeLeafNodes(const pptr<LeafNode> &node1, const pptr<LeafNode> &node2) {
478 479
    assert(node1 != nullptr);
    assert(node2 != nullptr);
480 481
    auto &node1Ref = *node1;
    auto &node2Ref = *node2;
482 483 484
    auto &n1NumKeys = node1Ref.numKeys.get_rw();
    const auto &n2NumKeys = node2Ref.numKeys.get_ro();
    assert(n1NumKeys + n2NumKeys <= M);
485 486

    /// we move all keys/values from node2 to node1
487 488 489 490 491 492 493
    auto &node1Keys = node1Ref.keys.get_rw();
    auto &node1Vals = node1Ref.values.get_rw();
    const auto &node2Keys = node2Ref.keys.get_ro();
    const auto &node2Vals = node2Ref.values.get_ro();
    for (auto i = 0u; i < n2NumKeys; ++i) {
      node1Keys[n1NumKeys + i] = node2Keys[i];
      node1Vals[n1NumKeys + i] = node2Vals[i];
494
    }
495
    n1NumKeys += n2NumKeys;
496
    node1Ref.nextLeaf = node2Ref.nextLeaf;
497 498 499 500 501 502 503 504
    if (node2Ref.nextLeaf != nullptr) {
      node2Ref.nextLeaf->prevLeaf = node1;
      PersistEmulation::writeBytes<16>();
    }
    PersistEmulation::writeBytes(
        n2NumKeys * (sizeof(KeyType) + sizeof(ValueType)) +  ///< moved keys, vals
        sizeof(unsigned int) + 16                            ///< numKeys + nextpointer
    );
505 506 507 508
    return node1;
  }

  /**
509 510 511
   * Redistribute (key, value) pairs from the leaf node @c donor to the leaf node @c receiver such
   * that both nodes have approx. the same number of elements. This method is used in case of an
   * underflow situation of a leaf node.
512 513 514 515
   *
   * @param donor the leaf node from which the elements are taken
   * @param receiver the sibling leaf node getting the elements from @c donor
   */
516 517 518
  void balanceLeafNodes(const pptr<LeafNode> &donor, const pptr<LeafNode> &receiver) {
    auto &donorRef = *donor;
    auto &receiverRef = *receiver;
519 520 521
    auto &dNumKeys = donorRef.numKeys.get_rw();
    auto &rNumKeys = receiverRef.numKeys.get_rw();
    assert(dNumKeys > rNumKeys);
522

523 524
    const auto balancedNum = (dNumKeys + rNumKeys) / 2;
    const auto toMove = dNumKeys - balancedNum;
525 526
    if (toMove == 0) return;

527 528 529 530 531 532 533
    auto &receiverKeys = receiverRef.keys.get_rw();
    auto &receiverVals = receiverRef.values.get_rw();
    auto &donorKeys = donorRef.keys.get_rw();
    auto &donorVals = donorRef.values.get_rw();

    if (donorKeys[0] < receiverKeys[0]) {
      /// move from one node to a node with larger keys
534
      /// move toMove keys/values from donor to receiver
535
      for (auto i = 0u; i < toMove; ++i) {
536
        const auto max = findMaxKey(donor);
537
        /// move the donor's maximum key to the receiver
538 539 540
        receiverKeys[rNumKeys] = donorKeys[max];
        receiverVals[rNumKeys] = donorVals[max];
        ++rNumKeys;
541 542

        /// fill empty space in donor with its currently last key
543
        --dNumKeys;
544 545
        donorKeys[max] = donorKeys[dNumKeys];
        donorVals[max] = donorVals[dNumKeys];
546 547
      }
    } else {
548 549
      /// mode from one node to a node with smaller keys
      /// move toMove keys/values from donor to receiver
550
      for (auto i = 0u; i < toMove; ++i) {
551
        const auto min = findMinKey(donor);
552
        /// move the donor's minimum key to the receiver
553 554 555
        receiverKeys[rNumKeys] = donorKeys[min];
        receiverVals[rNumKeys] = donorVals[min];
        ++rNumKeys;
556 557

        /// fill empty space in donor with its currently last key
558
        --dNumKeys;
559 560
        donorKeys[min] = donorKeys[dNumKeys];
        donorVals[min] = donorVals[dNumKeys];
561 562
      }
    }
563
    PersistEmulation::writeBytes(2 * toMove * (sizeof(KeyType) + sizeof(ValueType)) +
564
                                 2* sizeof(LeafNode::numKeys));
565
  }
566 567 568 569 570

  /* -------------------------------------------------------------------------------------------- */
  /* DELETE AT INNER LEVEL                                                                        */
  /* -------------------------------------------------------------------------------------------- */

571
  /**
572 573
   * Delete an entry from the tree by recursively going down to the leaf level and handling the
   * underflows.
574 575 576 577 578 579
   *
   * @param node the current branch node
   * @param d the current depth of the traversal
   * @param key the key to be deleted
   * @return true if the entry was deleted
   */
580
  bool eraseFromBranchNode(const pptr<BranchNode> &node, const unsigned int d, const KeyType &key) {
581
    assert(d >= 1);
582
    const auto &nodeRef = *node;
583
    bool deleted = false;
584
    /// try to find the branch
585
    auto pos = lookupPositionInBranchNode(node, key);
586
    auto n = nodeRef.children.get_ro()[pos];
587
    if (d == 1) {
588
      /// the next level is the leaf level
589
      auto leaf = n.leaf;
590
      auto &leafRef = *leaf;
591 592
      assert(leaf != nullptr);
      deleted = eraseFromLeafNode(leaf, key);
593 594 595
      constexpr auto middle = (M + 1) / 2;
      if (leafRef.numKeys.get_ro() < middle) {
        /// handle underflow
596 597 598 599 600 601
        underflowAtLeafLevel(node, pos, leaf);
      }
    } else {
      auto child = n.branch;
      deleted = eraseFromBranchNode(child, d - 1, key);
      pos = lookupPositionInBranchNode(node, key);
602 603 604
      constexpr auto middle = (N + 1) / 2;
      if (child->numKeys.get_ro() < middle) {
        /// handle underflow
605
        child = underflowAtBranchLevel(node, pos, child);
606 607
        if (d == depth.get_ro() && nodeRef.numKeys.get_ro() == 0) {
          /// special case: the root node is empty now
608
          rootNode = child;
609
          --depth.get_rw();
610 611 612 613 614 615 616
        }
      }
    }
    return deleted;
  }

  /**
617 618
   * Merge two branch nodes my moving all keys/children from @c node to @c sibling and put the key
   * @c key from the parent node in the middle. The node @c node should be deleted by the caller.
619 620 621 622 623
   *
   * @param sibling the left sibling node which receives all keys/children
   * @param key the key from the parent node that is between sibling and node
   * @param node the node from which we move all keys/children
   */
624
  void mergeBranchNodes(const pptr<BranchNode> &sibling, const KeyType &key,
625
                        const pptr<BranchNode> &node){
626 627 628
    assert(sibling != nullptr);
    assert(node != nullptr);

629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645
    const auto &nodeRef = *node;
    auto &sibRef = *sibling;
    const auto &nodeKeys = nodeRef.keys.get_ro();
    const auto &nodeChilds = nodeRef.children.get_ro();
    const auto &nNumKeys = nodeRef.numKeys.get_ro();
    auto &sibKeys = sibRef.keys.get_rw();
    auto &sibChilds = sibRef.children.get_rw();
    auto &sNumKeys = sibRef.numKeys.get_rw();

    assert(key <= nodeKeys[0]);
    assert(sibKeys[sNumKeys - 1] < key);

    sibKeys[sNumKeys] = key;
    sibChilds[sNumKeys + 1] = nodeChilds[0];
    for (auto i = 0u; i < nNumKeys; ++i) {
      sibKeys[sNumKeys + i + 1] = nodeKeys[i];
      sibChilds[sNumKeys + i + 2] = nodeChilds[i + 1];
646
    }
647
    sNumKeys += nNumKeys + 1;
648 649 650
  }

  /**
651 652 653
   * Handle the case that during a delete operation a underflow at node @c child occured where @c
   * node is the parent node. If possible this is handled
   * (1) by rebalancing the elements among the node @c child and one of its siblings
654 655 656
   * (2) if not possible by merging with one of its siblings.
   *
   * @param node the parent node of the node where the underflow occured
657
   * @param pos the position of the child node @child in the @c children array of the branch node
658 659 660
   * @param child the node at which the underflow occured
   * @return the (possibly new) child node (in case of a merge)
   */
661 662
  pptr<BranchNode> underflowAtBranchLevel(const pptr<BranchNode> &node, unsigned int pos,
                                          const pptr<BranchNode> &child) {
663 664
    assert(node != nullptr);
    assert(child != nullptr);
665 666 667 668 669
    auto &nodeRef = *node;
    const auto &nodeKeys = nodeRef.keys.get_ro();
    const auto &nodeChilds = nodeRef.children.get_ro();
    const auto nNumKeys = nodeRef.numKeys.get_ro();
    auto prevNumKeys = 0u, nextNumKeys = 0u;
670
    constexpr auto middle = (N + 1) / 2;
671

672
    /// 1. we check whether we can rebalance with one of the siblings
673
    if (pos > 0 && (prevNumKeys = nodeChilds[pos - 1].branch->numKeys.get_ro()) > middle) {
674
      /// we have a sibling at the left for rebalancing the keys
675
      auto sibling = nodeChilds[pos - 1].branch;
676
      balanceBranchNodes(sibling, child, node, pos - 1);
677 678 679
      return child;
    } else if (pos < nNumKeys &&
              (nextNumKeys = nodeChilds[pos + 1].branch->numKeys.get_ro()) > middle) {
680
      /// we have a sibling at the right for rebalancing the keys
681
      auto sibling = (nodeChilds[pos + 1]).branch;
682
      balanceBranchNodes(sibling, child, node, pos);
683
      return child;
684
    } else {
685
      /// 2. if this fails we have to merge two branch nodes
686
      auto newChild = child;
687 688

      auto ppos = pos;
689 690 691
      if (prevNumKeys > 0) {
        auto &lSibling = nodeChilds[pos - 1].branch;
        mergeBranchNodes(lSibling, nodeKeys[pos - 1], child);
692
        ppos = pos - 1;
693
        deleteBranchNode(child);
694
        newChild = lSibling;
695 696 697 698 699 700 701 702 703
      } else if (nextNumKeys > 0) {
        auto &rSibling = nodeChilds[pos + 1].branch;
        mergeBranchNodes(child, nodeKeys[pos], rSibling);
        deleteBranchNode(rSibling);
      } else assert(false); ///< shouldn't happen

      /// remove nodeKeys[pos] from node
      for (auto i = ppos; i < nNumKeys - 1; ++i) {
        nodeRef.keys.get_rw()[i] = nodeKeys[i + 1];
704
      }
705 706 707 708
      if (pos == 0) ++pos;
      for (auto i = pos; i < nNumKeys; ++i) {
        if (i + 1 <= nNumKeys){
          nodeRef.children.get_rw()[i] = nodeChilds[i + 1];
709 710
        }
      }
711
      --nodeRef.numKeys.get_rw();
712 713 714 715 716 717

      return newChild;
    }
  }

  /**
718 719 720
   * Rebalance two branch nodes by moving some key-children pairs from the node @c donor to the node
   * @receiver via the parent node @parent. The position of the key between the two nodes is denoted
   * by @c pos.
721 722 723 724
   *
   * @param donor the branch node from which the elements are taken
   * @param receiver the sibling branch node getting the elements from @c donor
   * @param parent the parent node of @c donor and @c receiver
725
   * @param pos the position of the key in node @c parent that lies between @c donor and @c receiver
726
   */
727 728 729 730 731
  void balanceBranchNodes(const pptr<BranchNode> &donor, const pptr<BranchNode> &receiver,
                          const pptr<BranchNode> &parent, const unsigned int pos) {
    auto &donorRef = *donor;
    auto &receiverRef = *receiver;
    auto &parentRef = *parent;
732 733 734 735 736 737 738 739 740 741 742
    auto &donorKeys = donorRef.keys.get_rw();
    auto &donorChilds = donorRef.children.get_rw();
    auto &dNumKeys = donorRef.numKeys.get_rw();
    auto &receiverKeys = receiverRef.keys.get_rw();
    auto &receiverChilds = receiverRef.children.get_rw();
    auto &rNumKeys = receiverRef.numKeys.get_rw();
    auto &parentKeys = parentRef.keys.get_rw();
    assert(dNumKeys > rNumKeys);

    const auto balancedNum = (dNumKeys + rNumKeys) / 2;
    const auto toMove = dNumKeys - balancedNum;
743
    if (toMove == 0) return;
744 745

    if (donorKeys[0] < receiverKeys[0]) {
746
      /// move from one node to a node with larger keys
747 748
      unsigned int i = 0;

749
      /// 1. make room
750 751
      receiverChilds[rNumKeys + toMove] = receiverChilds[rNumKeys];
      for (i = rNumKeys; i > 0; --i) {
752
        /// reserve space on receiver side
753 754
        receiverKeys[i + toMove - 1] = receiverKeys[i - 1];
        receiverChilds[i + toMove - 1] = receiverChilds[i - 1];
755
      }
756
      /// 2. move toMove keys/children from donor to receiver
757 758
      for (i = 0; i < toMove; ++i) {
        receiverChilds[i] = donorChilds[dNumKeys - toMove + 1 + i];
759
      }
760 761
      for (i = 0; i < toMove - 1; ++i) {
        receiverKeys[i] = donorKeys[dNumKeys - toMove + 1 + i];
762
      }
763
      receiverKeys[toMove - 1] = parentKeys[pos];
764
      assert(parentRef.numKeys.get_ro() > pos);
765 766
      parentKeys[pos] = donorKeys[dNumKeys - toMove];
      rNumKeys += toMove;
767
    } else {
768
      /// mode from one node to a node with smaller keys
769
      unsigned int i = 0, n = rNumKeys;
770

771
      /// 1. move toMove keys/children from donor to receiver
772 773 774
      for (i = 0; i < toMove; ++i) {
        receiverChilds[n + 1 + i] = donorChilds[i];
        receiverKeys[n + 1 + i] = donorKeys[i];
775
      }
776 777 778 779
      /// 2. we have to move via the parent node: take the key from parentKeys[pos]
      receiverKeys[n] = parentKeys[pos];
      rNumKeys = rNumKeys + toMove;
      KeyType key = donorKeys[toMove - 1];
780 781

      /// 3. on donor node move all keys and values to the left
782 783 784
      for (i = 0; i < dNumKeys - toMove; ++i) {
        donorKeys[i] = donorKeys[toMove + i];
        donorChilds[i] = donorChilds[toMove + i];
785
      }
786 787
      donorChilds[dNumKeys - toMove] = donorChilds[dNumKeys];
      /// and replace this key by donorKeys[0]
788
      assert(parentRef.numKeys.get_ro() > pos);
789
      parentKeys[pos] = key;
790
    }
791
    dNumKeys -= toMove;
792 793
  }

794 795 796
  /* -------------------------------------------------------------------------------------------- */
  /* DEBUGGING                                                                                    */
  /* -------------------------------------------------------------------------------------------- */
797 798

  /**
799
   * Print the given branch node @c node and all its children to standard output.
800 801 802 803
   *
   * @param d the current depth used for indention
   * @param node the tree node to print
   */
804 805
  void printBranchNode(const unsigned int d, const pptr<BranchNode> &node) const {
    const auto &nodeRef = *node;
806 807 808 809 810
    const auto &nodeKeys = nodeRef.keys.get_ro();
    const auto &nodeChilds = nodeRef.children.get_ro();
    const auto nNumKeys = nodeRef.numKeys.get_ro();

    for (auto i = 0u; i < d; ++i) std::cout << "  ";
811
    std::cout << d << " { ";
812
    for (auto k = 0u; k < nNumKeys; ++k) {
813
      if (k > 0) std::cout << ", ";
814
      std::cout << nodeKeys[k];
815 816
    }
    std::cout << " }" << std::endl;
817
    for (auto k = 0u; k <= nNumKeys; ++k) {
818
      if (d + 1 < depth) {
819
        auto child = nodeChilds[k].branch;
820 821
        if (child != nullptr) printBranchNode(d + 1, child);
      } else {
822
        auto leaf = (nodeChilds[k]).leaf;
823 824 825 826 827 828
        printLeafNode(d+1,leaf);
      }
    }
  }

  /**
829
   * Print the keys of the given branch node @c node to standard output.
830 831 832
   *
   * @param node the tree node to print
   */
833 834
  void printBranchNodeKeys(const pptr<BranchNode> &node) const {
    const auto &nodeRef = *node;
835 836 837
    const auto &nodeKeys = nodeRef.keys.get_ro();
    const auto nNumKeys = nodeRef.numKeys.get_ro();

838
    std::cout << "{ ";
839
    for (auto k = 0u; k < nNumKeys; ++k) {
840
      if (k > 0) std::cout << ", ";
841
      std::cout << nodeKeys[k];
842 843 844 845 846 847 848 849 850 851
    }
    std::cout << " }" << std::endl;
  }

  /**
   * Print the given leaf node @c node to standard output.
   *
   * @param d the current depth used for indention
   * @param node the tree node to print
   */
852 853
  void printLeafNode(const unsigned int d, const pptr<LeafNode> &node) const {
    const auto &nodeRef = *node;
854 855 856 857
    const auto &nodeKeys = nodeRef.keys.get_ro();
    const auto nNumKeys = nodeRef.numKeys.get_ro();

    for (auto i = 0u; i < d; ++i) std::cout << "  ";
858
    std::cout << "[" << std::hex << node << std::dec << " : ";
859
    for (auto i = 0u; i < nNumKeys; ++i) {
860
      if (i > 0) std::cout << ", ";
861
      std::cout << "{" << nodeKeys[i]<< "}";
862 863 864 865 866 867 868 869
    }
    std::cout << "]" << std::endl;
  }
  /**
   * Print the keys of the  given leaf node @c node to standard output.
   *
   * @param node the tree node to print
   */
870 871
  void printLeafNodeKeys(const pptr<LeafNode> &node) const {
    const auto &nodeRef = *node;
872 873 874
    const auto &nodeKeys = nodeRef.keys.get_ro();
    const auto nNumKeys = nodeRef.numKeys.get_ro();

875
    std::cout<<"[";
876
    for (auto i = 0u; i < nNumKeys; ++i) {
877
      if (i > 0) std::cout << ", ";
878
      std::cout << "{" << nodeKeys[i] << "}";
879 880 881 882
    }
    std::cout << "]" << std::endl;
  }

883 884 885
  /* -------------------------------------------------------------------------------------------- */
  /* INSERT                                                                                       */
  /* -------------------------------------------------------------------------------------------- */
886 887

  /**
888 889 890
   * Insert a (key, value) pair into the corresponding leaf node. It is the responsibility of the
   * caller to make sure that the node @c node is the correct node. The key is inserted at the last
   * position.
891 892 893 894 895 896
   *
   * @param node the node where the key-value pair is inserted.
   * @param key the key to be inserted
   * @param val the value associated with the key
   * @param splitInfo information about a possible split of the node
   */
897 898
  bool insertInLeafNode(const pptr<LeafNode> &node, const KeyType &key, const ValueType &val,
                        SplitInfo *splitInfo) {
899
    bool split = false;
Philipp Götze's avatar
Philipp Götze committed
900
    auto &nodeRef = *node;
901 902 903
    const auto &numKeys = nodeRef.numKeys.get_ro();
    auto pos = lookupPositionInLeafNode(node, key);
    if (pos < numKeys && nodeRef.keys.get_ro()[pos] == key) {
904
      /// handle insert of duplicates
905
      nodeRef.values.get_rw()[pos] = val;
906 907
      return false;
    }
908
    if (numKeys == M) {
909
      /* split the node */
Philipp Götze's avatar
Philipp Götze committed
910 911 912
      splitLeafNode(node, splitInfo);
      auto &splitRef = *splitInfo;
      auto sibling = splitRef.rightChild.leaf;
913

Philipp Götze's avatar
Philipp Götze committed
914
      /* insert the new entry */
915
      if (key > splitRef.key) {
916
        insertInLeafNodeAtLastPosition(sibling, key, val);
917
      } else {
918
        if (key > findMaxKey(node)) {
919 920 921 922 923 924 925
          /// Special case: new key would be the middle, thus must be right
          insertInLeafNodeAtLastPosition(sibling, key, val);
          splitRef.key = key;
        } else {
          insertInLeafNodeAtLastPosition(node, key, val);
        }
      }
Philipp Götze's avatar
Philipp Götze committed
926

927
      /* inform the caller about the split */
928
      splitRef.key = sibling->keys.get_ro()[findMinKey(sibling)];
929 930
      split = true;
    } else {
931
      /* otherwise, we can simply insert the new entry at the last position */
932 933 934 935 936
      insertInLeafNodeAtLastPosition(node, key