BitHPBPTree.hpp 47.1 KB
Newer Older
1
/*
2
 * Copyright (C) 2017-2020 DBIS Group - TU Ilmenau, All Rights Reserved.
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 *
 * This file is part of our NVM-based Data Structures repository.
 *
 * This program is free software: you can redistribute it and/or modify it under the terms of the
 * GNU General Public License as published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 * See the GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along with this program.
 * If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef DBIS_BitHPBPTree_hpp_
#define DBIS_BitHPBPTree_hpp_

21
22
#include <libpmemobj/ctl.h>

23
24
25
26
27
28
29
30
#include <array>
#include <iostream>
#include <libpmemobj++/make_persistent.hpp>
#include <libpmemobj++/p.hpp>
#include <libpmemobj++/persistent_ptr.hpp>
#include <libpmemobj++/transaction.hpp>
#include <libpmemobj++/utils.hpp>

31
#include "config.h"
32
33
34
#include "utils/Bitmap.hpp"
      #include <bitset>

35
#include "utils/PersistEmulation.hpp"
36
#include "utils/SearchFunctions.hpp"
37
38
39

namespace dbis::pbptrees {

40
using pmem::obj::allocation_flag;
41
42
43
44
45
using pmem::obj::delete_persistent;
using pmem::obj::make_persistent;
using pmem::obj::p;
using pmem::obj::persistent_ptr;
using pmem::obj::transaction;
46
template <typename Object>
47
48
49
using pptr = persistent_ptr<Object>;

/**
50
51
52
53
54
55
56
57
 * A persistent memory implementation of a B+ tree.
 *
 * @tparam KeyType the data type of the key
 * @tparam ValueType the data type of the values associated with the key
 * @tparam N the maximum number of keys on a branch node
 * @tparam M the maximum number of keys on a leaf node
 */
template <typename KeyType, typename ValueType, size_t N, size_t M>
58
59
60
61
62
63
64
65
66
class BitHPBPTree {
  /// we need at least two keys on a branch node to be able to split
  static_assert(N > 2, "number of branch keys has to be >2.");
  /// we need an even order for branch nodes to be able to merge
  static_assert(N % 2 == 0, "order of branch nodes must be even.");
  /// we need at least one key on a leaf node
  static_assert(M > 0, "number of leaf keys should be >0.");

#ifndef UNIT_TESTS
67
 private:
68
#else
69
 public:
70
71
72
73
74
75
76
#endif

  /// Forward declarations
  struct LeafNode;
  struct BranchNode;

  struct Node {
77
    Node() : tag(BLANK){};
78

79
80
    explicit Node(const pptr<LeafNode> &leaf_) : tag(LEAF), leaf(leaf_){};
    explicit Node(const BranchNode *branch_) : tag(BRANCH), branch(branch_){};
81
82
83
84
85
86
    Node(const Node &other) { copy(other); };

    void copy(const Node &other) {
      tag = other.tag;
      switch (tag) {
        case LEAF: {
87
88
          leaf = other.leaf;
          break;
89
90
        }
        case BRANCH: {
91
92
          branch = other.branch;
          break;
93
94
95
96
97
        }
        default:;
      }
    }

98
99
100
101
    Node &operator=(const Node &other) {
      copy(other);
      return *this;
    }
102
    Node &operator=(const pptr<LeafNode> &leaf_) {
103
104
105
      tag = LEAF;
      leaf = leaf_;
      return *this;
106
107
    }
    Node &operator=(BranchNode *const branch_) {
108
109
110
      tag = BRANCH;
      branch = branch_;
      return *this;
111
112
    }

113
    enum NodeType { BLANK, LEAF, BRANCH } tag;
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134

    union {
      pptr<LeafNode> leaf;
      BranchNode *branch;
    };
  };

  /* -------------------------------------------------------------------------------------------- */

  /**
   * A structure for representing a leaf node of a B+ tree.
   */
  struct alignas(64) LeafNode {
    static constexpr auto NUM_KEYS = M;
    using KEY_TYPE = KeyType;

    /**
     * Constructor for creating a new empty leaf node.
     */
    LeafNode() : nextLeaf(nullptr), prevLeaf(nullptr) {}

135
136
137
    static constexpr auto BitsetSize = ((M + 63) / 64) * 8;  ///< number * size of words
    static constexpr auto PaddingSize = (64 - (BitsetSize + 32) % 64) % 64;

138
    p<dbis::Bitmap<M>> bits;             ///< bitmap for valid entries
139
140
141
142
143
    pptr<LeafNode> nextLeaf;             ///< pointer to the subsequent sibling
    pptr<LeafNode> prevLeaf;             ///< pointer to the preceeding sibling
    char padding[PaddingSize];           ///< padding to align keys to 64 bytes
    p<std::array<KeyType, M>> keys;      ///< the actual keys
    p<std::array<ValueType, M>> values;  ///< the actual values
144
145
146
147
148
149
150
151
152
153
154
155
156
  };

  /**
   * A structure for representing an branch node (branch node) of a B+ tree.
   * The rightmost child is always at position N.
   */
  struct alignas(64) BranchNode {
    static constexpr auto NUM_KEYS = N;
    using KEY_TYPE = KeyType;

    /**
     * Constructor for creating a new empty branch node.
     */
157
    BranchNode() {}
158

159
160
161
    dbis::Bitmap<N> bits;              ///< bitmap for valid entries
    std::array<KeyType, N> keys;       ///< the actual keys
    std::array<Node, N + 1> children;  ///< pointers to child nodes (BranchNode or LeafNode)
162
163
164
165
166
167
168
169
  };

  /**
   * Create a new empty leaf node.
   */
  pptr<LeafNode> newLeafNode() {
    auto pop = pmem::obj::pool_by_vptr(this);
    pptr<LeafNode> newNode = nullptr;
170
171
172
    transaction::run(pop, [&] {
      newNode = make_persistent<LeafNode>(allocation_flag::class_id(alloc_class.class_id));
    });
173
174
175
176
177
178
    return newNode;
  }

  pptr<LeafNode> newLeafNode(const pptr<LeafNode> &other) {
    auto pop = pmem::obj::pool_by_vptr(this);
    pptr<LeafNode> newNode = nullptr;
179
180
181
    transaction::run(pop, [&] {
      newNode = make_persistent<LeafNode>(allocation_flag::class_id(alloc_class.class_id), *other);
    });
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
    return newNode;
  }

  /**
   * Remove/delete an existing leaf node.
   */
  void deleteLeafNode(pptr<LeafNode> &node) {
    auto pop = pmem::obj::pool_by_vptr(this);
    transaction::run(pop, [&] { delete_persistent<LeafNode>(node); });
    node = nullptr;
  }

  /**
   * Create a new empty branch node.
   */
197
  BranchNode *newBranchNode() { return new BranchNode(); }
198

199
  BranchNode *newBranchNode(const BranchNode *other) { return new BranchNode(*other); }
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214

  /**
   * Remove/delete an existing branch node.
   */
  void deleteBranchNode(BranchNode *&node) {
    delete node;
    node = nullptr;
  }

  /* -------------------------------------------------------------------------------------------- */

  /**
   * A structure for passing information about a node split to the caller.
   */
  struct SplitInfo {
215
216
217
    KeyType key;      ///< the key at which the node was split
    Node leftChild;   ///< the resulting lhs child node
    Node rightChild;  ///< the resulting rhs child node
218
219
  };

220
221
  static constexpr pobj_alloc_class_desc AllocClass{256, 64, 1, POBJ_HEADER_COMPACT};
  pobj_alloc_class_desc alloc_class;
222
223
  unsigned int depth; /**< the depth of the tree, i.e. the number of levels
                           (0 => rootNode is LeafNode) */
224
225
  Node rootNode;      /**< pointer to the root node (an instance of @c LeafNode or @c BranchNode).
                           This pointer is never @c nullptr. */
226
227
228
  pptr<LeafNode> leafList; /**< Pointer to the leaf at the most left position.
                                Neccessary for recovery */

229
 public:
230
231
232
233
234
235
236
  /**
   * Iterator for iterating over the leaf nodes.
   */
  class iterator {
    pptr<LeafNode> currentNode;
    size_t currentPosition;

237
   public:
238
239
240
241
242
243
244
245
246
    iterator() : currentNode(nullptr), currentPosition(0) {}

    iterator(const Node &root, size_t d) {
      /// traverse to left-most key
      auto node = root;
      while (d-- > 0) node = node.branch->children[0];
      currentNode = node.leaf;
      currentPosition = 0;
      const auto &nodeBits = currentNode->bits.get_ro();
247
      while (!nodeBits.test(currentPosition)) ++currentPosition;
248
249
250
    }

    iterator &operator++() {
251
      if (currentPosition >= M - 1) {
252
253
254
255
        currentNode = currentNode->nextLeaf;
        currentPosition = 0;
        if (currentNode == nullptr) return *this;
        const auto &nodeBits = currentNode->bits.get_ro();
256
        while (!nodeBits.test(currentPosition)) ++currentPosition;
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
      } else {
        if (!currentNode->bits.get_ro().test(++currentPosition)) ++(*this);
      }
      return *this;
    }

    iterator operator++(int) {
      iterator retval = *this;
      ++(*this);
      return retval;
    }

    bool operator==(iterator other) const {
      return (currentNode == other.currentNode && currentPosition == other.currentPosition);
    }

    bool operator!=(iterator other) const { return !(*this == other); }

    std::pair<KeyType, ValueType> operator*() {
      const auto &nodeRef = *currentNode;
      return std::make_pair(nodeRef.keys.get_ro()[currentPosition],
                            nodeRef.values.get_ro()[currentPosition]);
    }

    /// iterator traits
    using difference_type = long;
    using value_type = std::pair<KeyType, ValueType>;
    using pointer = const std::pair<KeyType, ValueType> *;
    using reference = const std::pair<KeyType, ValueType> &;
    using iterator_category = std::forward_iterator_tag;
  };

  iterator begin() { return iterator(rootNode, depth); }

  iterator end() { return iterator(); }

  /**
   * Alias for a function passed to the scan method.
   */
  using ScanFunc = std::function<void(const KeyType &key, const ValueType &val)>;

  /**
   * Constructor for creating a new B+ tree.
   */
301
  explicit BitHPBPTree(struct pobj_alloc_class_desc _alloc) : depth(0), alloc_class(_alloc) {
302
    // BitHPBPTree() : depth(0) {
303
304
    rootNode = newLeafNode();
    leafList = rootNode.leaf;
305
306
    LOG("created new tree with sizeof(BranchNode) = "
        << sizeof(BranchNode) << ", sizeof(LeafNode) = " << sizeof(LeafNode));
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
  }

  /**
   * Destructor for the B+ tree.
   */
  ~BitHPBPTree() {}

  /**
   * Insert an element (a key-value pair) into the B+ tree. If the key @c key already exists, the
   * corresponding value is replaced by @c val.
   *
   * @param key the key of the element to be inserted
   * @param val the value that is associated with the key
   */
  void insert(const KeyType &key, const ValueType &val) {
    SplitInfo splitInfo;

    bool wasSplit = false;
    if (depth == 0) {
      /// the root node is a leaf node
      auto n = rootNode.leaf;
      wasSplit = insertInLeafNode(n, key, val, &splitInfo);
    } else {
      /// the root node is a branch node
      auto n = rootNode.branch;
      wasSplit = insertInBranchNode(n, depth, key, val, &splitInfo);
    }
    if (wasSplit) {
      /// we had an overflow in the node and therefore the node is split
      auto root = newBranchNode();
      auto &rootRef = *root;
      rootRef.keys[0] = splitInfo.key;
      rootRef.children[0] = splitInfo.leftChild;
      rootRef.children[N] = splitInfo.rightChild;
      rootRef.bits.set(0);
      rootNode.branch = root;
      ++depth;
    }
  }

  /**
   * Find the given @c key in the B+ tree and if found return the corresponding value.
   *
   * @param key the key we are looking for
   * @param[out] val a pointer to memory where the value is stored if the key was found
   * @return true if the key was found, false otherwise
   */
  bool lookup(const KeyType &key, ValueType *val) const {
    assert(val != nullptr);

    const auto leafNode = findLeafNode(key);
    const auto pos = lookupPositionInLeafNode(leafNode, key);
    if (pos < M) {
      /// we found it
      *val = leafNode->values.get_ro()[pos];
      return true;
    }
    return false;
  }

  /**
   * Delete the entry with the given key @c key from the tree.
   *
   * @param key the key of the entry to be deleted
   * @return true if the key was found and deleted
   */
  bool erase(const KeyType &key) {
    bool result;
    if (depth == 0) {
      /// special case: the root node is a leaf node and there is no need to handle underflow
      auto node = rootNode.leaf;
      assert(node != nullptr);
      result = eraseFromLeafNode(node, key);
    } else {
      auto node = rootNode.branch;
      assert(node != nullptr);
      result = eraseFromBranchNode(node, depth, key);
    }
    return result;
  }

  /**
   * Recover the BitHPBPTree by iterating over the LeafList and using the recoveryInsert method.
   */
  void recover() {
    LOG("Starting RECOVERY of BitHPBPTree");
    pptr<LeafNode> currentLeaf = leafList;
394
    if (leafList == nullptr) {
395
396
      LOG("No data to recover HPBPTree");
    }
397
    if (leafList->nextLeaf == nullptr) {
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
      /// The index has only one node, so the leaf node becomes the root node
      rootNode = leafList;
      depth = 0;
    } else {
      rootNode = newBranchNode();
      depth = 1;
      rootNode.branch->children[0] = currentLeaf;
      currentLeaf = currentLeaf->nextLeaf;
      while (currentLeaf != nullptr) {
        recoveryInsert(currentLeaf);
        currentLeaf = currentLeaf->nextLeaf;
      }
    }
    LOG("RECOVERY Done")
  }

  /**
   * Print the structure and content of the B+ tree to stdout.
   */
  void print() const {
418
419
420
421
    if (depth == 0)
      printLeafNode(0u, rootNode.leaf);
    else
      printBranchNode(0u, rootNode.branch);
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
  }

  /**
   * Perform a scan over all key-value pairs stored in the B+ tree.
   * For each entry the given function @func is called.
   *
   * @param func the function called for each entry
   */
  void scan(ScanFunc func) const {
    /// we traverse to the leftmost leaf node
    auto node = rootNode;
    auto d = depth;
    /// as long as we aren't at the leaf level we follow the path down
    while (d-- > 0) node = node.branch->children[0];
    auto leaf = node.leaf;
    while (leaf != nullptr) {
      const auto &leafRef = *leaf;
      /// for each key-value pair call func
      const auto &leafBits = leafRef.bits.get_ro();
      const auto &leafKeys = leafRef.keys.get_ro();
      const auto &leafValues = leafRef.values.get_ro();
      for (auto i = 0u; i < M; ++i) {
        if (!leafBits.test(i)) continue;
        const auto &key = leafKeys[i];
        const auto &val = leafValues[i];
        func(key, val);
      }
      /// move to the next leaf node
      leaf = leafRef.nextLeaf;
    }
  }

  /**
   * Perform a range scan over all elements within the range [minKey, maxKey] and for each element
   * call the given function @c func.
   *
   * @param minKey the lower boundary of the range
   * @param maxKey the upper boundary of the range
   * @param func the function called for each entry
   */
  void scan(const KeyType &minKey, const KeyType &maxKey, ScanFunc func) const {
    auto leaf = findLeafNode(minKey);

    bool higherThanMax = false;
    while (!higherThanMax && leaf != nullptr) {
      /// for each key-value pair within the range call func
      const auto &leafRef = *leaf;
      const auto &leafBits = leafRef.bits.get_ro();
      const auto &leafKeys = leafRef.keys.get_ro();
      const auto &leafValues = leafRef.values.get_ro();
      for (auto i = 0u; i < M; ++i) {
        if (leafBits.test(i)) continue;

        const auto &key = leafKeys[i];
        if (key < minKey) continue;
477
478
479
480
        if (key > maxKey) {
          higherThanMax = true;
          continue;
        };
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505

        const auto &val = leafValues[i];
        func(key, val);
      }
      /// move to the next leaf node
      leaf = leafRef.nextLeaf;
    }
  }

#ifndef UNIT_TESTS
 private:
#endif
  /* -------------------------------------------------------------------------------------------- */
  /*                                     DELETE AT LEAF LEVEL                                     */
  /* -------------------------------------------------------------------------------------------- */

  /**
   * Delete the element with the given key from the given leaf node.
   *
   * @param node the leaf node from which the element is deleted
   * @param key the key of the element to be deleted
   * @return true of the element was deleted
   */
  bool eraseFromLeafNode(const pptr<LeafNode> &node, const KeyType &key) {
    auto pos = lookupPositionInLeafNode(node, key);
506
507
508
509
510
511
512
513
514
515
516
517
518
    return eraseFromLeafNodeAtPosition(node, pos, key);
  }

  /**
   * Delete the element with the given position and key from the given leaf node.
   *
   * @param node the leaf node from which the element is deleted
   * @param pos the position of the key in the node
   * @param key the key of the element to be deleted
   * @return true of the element was deleted
   */
  bool eraseFromLeafNodeAtPosition(const pptr<LeafNode> &node, const unsigned int pos,
                                   const KeyType &key) {
519
520
521
    if (pos < M) {
      /// simply reset bit
      node->bits.get_rw().reset(pos);
522
      PersistEmulation::writeBytes<1>();
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
      return true;
    }
    return false;
  }

  /**
   * Handle the case that during a delete operation an underflow at node @c leaf occured. If
   * possible this is handled
   * (1) by rebalancing the elements among the leaf node and one of its siblings
   * (2) if not possible by merging with one of its siblings.
   *
   * @param node the parent node of the node where the underflow occured
   * @param pos the position of the child node @leaf in the @c children array of the branch node
   * @param leaf the node at which the underflow occured
   */
538
539
540
541
542
543
  void underflowAtLeafLevel(BranchNode *const node, const unsigned int pos, pptr<LeafNode> &leaf) {
    assert(pos <= N);
    auto &nodeRef = *node;
    auto &leafRef = *leaf;
    auto prevNumKeys = 0u;
    constexpr auto middle = (M + 1) / 2;
544

545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
    /// 1. we check whether we can rebalance with one of the siblings but only if both nodes have
    ///    the same direct parent
    if (pos > 0 && (prevNumKeys = leafRef.prevLeaf->bits.get_ro().count()) > middle) {
      /// we have a sibling at the left for rebalancing the keys
      balanceLeafNodes(leafRef.prevLeaf, leaf);
      const auto newMin =
          leafRef.keys.get_ro()[findMinKeyPos(leafRef.keys.get_ro(), leafRef.bits.get_ro())];
      const auto prevPos =
          findMinKeyPosGreaterThan(leafRef.keys.get_ro(), leafRef.bits.get_ro(), newMin);
      nodeRef.keys[prevPos] = newMin;
    } else if (pos < N && leafRef.nextLeaf->bits.get_ro().count() > middle) {
      /// we have a sibling at the right for rebalancing the keys
      balanceLeafNodes(leafRef.nextLeaf, leaf);
      const auto &nextLeaf = *leafRef.nextLeaf;
      nodeRef.keys[pos] =
560
          nextLeaf.keys.get_ro()[findMinKeyPos(nextLeaf.keys.get_ro(), nextLeaf.bits.get_ro())];
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
    } else {
      /// 2. if this fails we have to merge two leaf nodes but only if both nodes have the same
      ///    direct parent
      pptr<LeafNode> survivor = nullptr;

      if (findMinKeyPos(nodeRef.keys, nodeRef.bits) != pos && prevNumKeys <= middle) {
        /// merge left
        survivor = mergeLeafNodes(leafRef.prevLeaf, leaf);
        deleteLeafNode(leaf);
        /// move to next left slot
        const auto prevPos =
            (pos == N) ? findMaxKeyPos(nodeRef.keys, nodeRef.bits)
                       :  ///< we need a new rightmost node
                findMaxKeyPosSmallerThan(nodeRef.keys, nodeRef.bits, nodeRef.keys[pos]);
        nodeRef.children[pos] = nodeRef.children[prevPos];
        nodeRef.bits.reset(prevPos);
      } else if (pos < N && leafRef.nextLeaf->bits.get_ro().count() <= middle) {
        /// merge right
        survivor = mergeLeafNodes(leaf, leafRef.nextLeaf);
        deleteLeafNode(leafRef.nextLeaf);
        /// move to next right slot
        const auto nextPos =
            findMinKeyPosGreaterThan(nodeRef.keys, nodeRef.bits, nodeRef.keys[pos]);
        nodeRef.children[nextPos] = nodeRef.children[pos];
        nodeRef.bits.reset(pos);
      } else
        assert(false);  ///< this shouldn't happen?!

      if (nodeRef.bits.count() == 0) {
        /// This is a special case that happens only if the current node is the root node. Now, we
        /// have to replace the branch root node by a leaf node.
        rootNode = survivor;
        --depth;
594
595
      }
    }
596
  }
597
598
599
600
601
602
603
604

  /**
   * Merge two leaf nodes by moving all elements from @c node2 to @c node1.
   *
   * @param node1 the target node of the merge
   * @param node2 the source node
   * @return the merged node (always @c node1)
   */
605
  pptr<LeafNode> mergeLeafNodes(const pptr<LeafNode> &node1, const pptr<LeafNode> &node2) {
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
    assert(node1 != nullptr);
    assert(node2 != nullptr);
    auto &node1Ref = *node1;
    auto &node2Ref = *node2;
    auto &node1Bits = node1Ref.bits.get_rw();
    const auto &node2Bits = node2Ref.bits.get_ro();
    assert(node1Bits.count() + node2Bits.count() <= M);

    /// we move all keys/values from node2 to node1
    auto &node1Keys = node1Ref.keys.get_rw();
    auto &node1Values = node1Ref.values.get_rw();
    const auto &node2Keys = node2Ref.keys.get_ro();
    const auto &node2Values = node2Ref.values.get_ro();
    for (auto i = 0u; i < M; ++i) {
      if (node2Bits.test(i)) {
621
        const auto u = node1Bits.getFreeZero();
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
        node1Keys[u] = node2Keys[i];
        node1Values[u] = node2Values[i];
        node1Bits.set(u);
      }
    }
    node1Ref.nextLeaf = node2Ref.nextLeaf;
    if (node2Ref.nextLeaf != nullptr) node2Ref.nextLeaf->prevLeaf = node1;
    return node1;
  }

  /**
   * Redistribute (key, value) pairs from the leaf node @c donor to the leaf node @c receiver such
   * that both nodes have approx. the same number of elements. This method is used in case of an
   * underflow situation of a leaf node.
   *
   * @param donor the leaf node from which the elements are taken
   * @param receiver the sibling leaf node getting the elements from @c donor
   */
  void balanceLeafNodes(const pptr<LeafNode> &donor, const pptr<LeafNode> &receiver) {
    auto &donorRef = *donor;
    auto &receiverRef = *receiver;
    const auto dNumKeys = donorRef.bits.get_ro().count();
    const auto rNumKeys = receiverRef.bits.get_ro().count();
    assert(dNumKeys > rNumKeys);

    const auto balancedNum = (dNumKeys + rNumKeys) / 2;
    const auto toMove = dNumKeys - balancedNum;
    if (toMove == 0) return;

    auto &receiverKeys = receiverRef.keys.get_rw();
    auto &receiverValues = receiverRef.values.get_rw();
    auto &receiverBits = receiverRef.bits.get_rw();
    const auto &donorKeys = donorRef.keys.get_ro();
    const auto &donorValues = donorRef.values.get_ro();
    auto &donorBits = donorRef.bits.get_rw();

    if (donorKeys[0] < receiverKeys[0]) {
      /// move from one node to a node with larger keys
      for (auto i = 0u; i < toMove; ++i) {
661
        const auto max = findMaxKeyPos(donorKeys, donorBits);
662
        const auto u = receiverBits.getFreeZero();
663
664
665
666
667
668
669
670
671
        /// move the donor's maximum key to the receiver
        receiverKeys[u] = donorKeys[max];
        receiverValues[u] = donorValues[max];
        receiverBits.set(u);
        donorBits.reset(max);
      }
    } else {
      /// move from one node to a node with smaller keys
      for (auto i = 0u; i < toMove; ++i) {
672
        const auto min = findMinKeyPos(donorKeys, donorBits);
673
        const auto u = receiverBits.getFreeZero();
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
        /// move the donor's minimum key to the receiver
        receiverKeys[u] = donorKeys[min];
        receiverValues[u] = donorValues[min];
        receiverBits.set(u);
        donorBits.reset(min);
      }
    }
  }

  /* -------------------------------------------------------------------------------------------- */
  /*                                     DELETE AT INNER LEVEL                                    */
  /* -------------------------------------------------------------------------------------------- */
  /**
   * Delete an entry from the tree by recursively going down to the leaf level and handling the
   * underflows.
   *
   * @param node the current branch node
   * @param d the current depth of the traversal
   * @param key the key to be deleted
   * @return true if the entry was deleted
   */
695
  bool eraseFromBranchNode(BranchNode *const node, const unsigned int d, const KeyType &key) {
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
    assert(d >= 1);
    auto &nodeRef = *node;
    bool deleted = false;
    /// try to find the branch
    auto pos = lookupPositionInBranchNode(node, key);
    if (d == 1) {
      /// the next level is the leaf level
      auto &leaf = nodeRef.children[pos].leaf;
      deleted = eraseFromLeafNode(leaf, key);
      constexpr auto middle = (M + 1) / 2;
      if (leaf->bits.get_ro().count() < middle) {
        /// handle underflow
        underflowAtLeafLevel(node, pos, leaf);
      }
    } else {
      auto &child = nodeRef.children[pos].branch;
      deleted = eraseFromBranchNode(child, d - 1, key);

      pos = lookupPositionInBranchNode(node, key);
      constexpr auto middle = (N + 1) / 2;
      if (child->bits.count() < middle) {
        /// handle underflow
        child = underflowAtBranchLevel(node, pos, child);
        if (d == depth && nodeRef.bits.count() == 0) {
          /// special case: the root node is empty now
          rootNode = child;
          --depth;
        }
      }
    }
    return deleted;
  }

  /**
   * Handle the case that during a delete operation a underflow at node @c child occured where
   * @c node is the parent node. If possible this is handled
   * (1) by rebalancing the elements among the node @c child and one of its siblings
   * (2) if not possible by merging with one of its siblings.
   *
   * @param node the parent node of the node where the underflow occured
   * @param pos the position of the child node @child in the @c children array of the branch node
   * @param child the node at which the underflow occured
   * @return the (possibly new) child node (in case of a merge)
   */
740
741
  BranchNode *underflowAtBranchLevel(BranchNode *const node, const unsigned int pos,
                                     BranchNode *child) {
742
743
744
    assert(node != nullptr);
    assert(child != nullptr);
    auto &nodeRef = *node;
745
    const auto nMinKeyPos = findMinKeyPos(nodeRef.keys, nodeRef.bits);
746
747
    const auto prevPos =
        findMaxKeyPosSmallerThan(nodeRef.keys, nodeRef.bits, nodeRef.keys[pos]);  // could be N
748
    const auto prevNumKeys = nodeRef.children[prevPos].branch->bits.count();
749
    const auto nextPos = findMinKeyPosGreaterThan(nodeRef.keys, nodeRef.bits, nodeRef.keys[pos]);
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
    const auto nextNumKeys = nodeRef.children[nextPos].branch->bits.count();
    constexpr auto middle = (N + 1) / 2;

    /// 1. we check whether we can rebalance with one of the siblings
    if (nMinKeyPos != pos && prevNumKeys > middle) {
      /// we have a sibling at the left for rebalancing the keys
      assert(prevPos != N);
      const auto sibling = nodeRef.children[prevPos].branch;
      balanceBranchNodes(sibling, child, node, pos);
      return child;
    } else if (pos < N && nextNumKeys > middle) {
      /// we have a sibling at the right for rebalancing the keys
      const auto sibling = nodeRef.children[nextPos].branch;
      balanceBranchNodes(sibling, child, node, pos);
      return child;
    }
    /// 2. if this fails we have to merge two branch nodes
    if (nMinKeyPos != pos && prevNumKeys <= middle) {
      /// merge to left
      auto &lSibling = nodeRef.children[prevPos].branch;
      mergeBranchNodes(lSibling, nodeRef.keys[pos], child);
      deleteBranchNode(child);
      nodeRef.bits.reset(pos);
773
      if (pos == N) nodeRef.children[N] = child;  ///< new rightmost child
774
775
776
777
778
779
780
      return lSibling;
    } else if (pos < N && nextNumKeys <= middle) {
      /// merge from right
      auto &rSibling = nodeRef.children[nextPos].branch;
      mergeBranchNodes(child, nodeRef.keys[pos], rSibling);
      deleteBranchNode(rSibling);
      nodeRef.bits.reset(pos);
781
      if (pos == findMaxKeyPos(nodeRef.keys, nodeRef.bits))
782
        nodeRef.children[N] = child;  ///< new rightmost child
783
      return child;
784
    } else {
785
      assert(false);  ///< shouldn't happen
786
787
      return child;
    }
788
789
790
791
792
793
794
795
796
797
  }

  /**
   * Merge two branch nodes by moving all keys/children from @c node to @c sibling and put the key
   * @c key from the parent node in the middle. The node @c node should be deleted by the caller.
   *
   * @param sibling the left sibling node which receives all keys/children
   * @param key the key from the parent node that is between sibling and node
   * @param node the node from which we move all keys/children
   */
798
  void mergeBranchNodes(BranchNode *const sibling, const KeyType &key, const BranchNode *node) {
799
800
801
802
803
804
    assert(sibling != nullptr);
    assert(node != nullptr);

    const auto &nodeRef = *node;
    auto &sibRef = *sibling;

805
806
    assert(key <= nodeRef.keys[findMinKeyPos(nodeRef.keys, nodeRef.bits)]);
    assert(sibRef.keys[findMaxKeyPos(sibRef.keys, sibRef.bits)] < key);
807

808
    auto u = sibRef.bits.getFreeZero();
809
810
811
812
813
    sibRef.keys[u] = key;
    sibRef.children[u] = sibRef.children[N];
    sibRef.bits.set(u);
    for (auto i = 0u; i < M; ++i) {
      if (nodeRef.bits.test(i)) {
814
        u = sibRef.bits.getFreeZero();
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
        sibRef.keys[u] = nodeRef.keys[i];
        sibRef.children[u] = nodeRef.children[i];
        sibRef.bits.set(u);
      }
    }
    sibRef.children[N] = nodeRef.children[N];
  }

  /**
   * Rebalance two branch nodes by moving some key-children pairs from the node @c donor to the node
   * @receiver via the parent node @parent. The position of the key between the two nodes is denoted
   * by @c pos.
   *
   * @param donor the branch node from which the elements are taken
   * @param receiver the sibling branch node getting the elements from @c donor
   * @param parent the parent node of @c donor and @c receiver
   * @param pos the position of the key in node @c parent that lies between @c donor and @c receiver
   */
833
834
  void balanceBranchNodes(BranchNode *const donor, BranchNode *const receiver,
                          BranchNode *const parent, const unsigned int pos) {
835
836
837
838
839
840
841
842
843
844
845
846
    auto &donorRef = *donor;
    auto &receiverRef = *receiver;
    auto &parentRef = *parent;
    const auto dNumKeys = donorRef.bits.count();
    const auto rNumKeys = receiverRef.bits.count();
    assert(dNumKeys > rNumKeys);

    const auto balancedNum = (dNumKeys + rNumKeys) / 2;
    const auto toMove = dNumKeys - balancedNum;
    if (toMove == 0) return;

    /// 1. move from one node to a node with larger keys
847
848
    if (donorRef.keys[donorRef.bits.getFirstSet()] <
        receiverRef.keys[receiverRef.bits.getFirstSet()]) {
849
      /// 1.1. copy parent key and rightmost child from donor
850
      auto u = receiverRef.bits.getFreeZero();
851
852
853
854
855
      receiverRef.keys[u] = parentRef.keys[pos];
      receiverRef.children[u] = donorRef.children[N];
      receiverRef.bits.set(u);
      /// 1.2. move toMove-1 keys/children from donor to receiver
      for (auto i = 1u; i < toMove; ++i) {
856
        const auto max = findMaxKeyPos(donorRef.keys, donorRef.bits);
857
        u = receiverRef.bits.getFreeZero();
858
859
860
861
862
863
        receiverRef.keys[u] = donorRef.keys[max];
        receiverRef.children[u] = donorRef.children[max];
        receiverRef.bits.set(u);
        donorRef.bits.reset(max);
      }
      /// 1.3 set donors new rightmost child and new parent key
864
      const auto dPos = findMaxKeyPos(donorRef.keys, donorRef.bits);
865
866
867
868
      donorRef.children[N] = donorRef.children[dPos];
      parentRef.keys[pos] = donorRef.keys[dPos];
      donorRef.bits.reset(dPos);

869
      /// 2. move from one node to a node with smaller keys
870
871
    } else {
      /// 2.1. copy parent key and rightmost child of receiver
872
      auto u = receiverRef.bits.getFreeZero();
873
874
875
876
877
      receiverRef.keys[u] = parentRef.keys[pos];
      receiverRef.children[u] = receiverRef.children[N];
      receiverRef.bits.set(u);
      /// 2.2. move toMove-1 keys/children from donor to receiver
      for (auto i = 1u; i < toMove; ++i) {
878
        u = receiverRef.bits.getFreeZero();
879
        const auto min = findMinKeyPos(donorRef.keys, donorRef.bits);
880
881
882
883
884
885
        receiverRef.keys[u] = donorRef.keys[min];
        receiverRef.children[u] = donorRef.children[min];
        receiverRef.bits.set(u);
        donorRef.bits.reset(min);
      }
      /// 2.3. set receivers new rightmost child and new parent key
886
      const auto dPos = findMinKeyPos(donorRef.keys, donorRef.bits);
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
      receiverRef.children[N] = donorRef.children[dPos];
      parentRef.keys[pos] = donorRef.keys[dPos];
      donorRef.bits.reset(dPos);
    }
  }

  /* -------------------------------------------------------------------------------------------- */
  /*                                         INSERT                                               */
  /* -------------------------------------------------------------------------------------------- */

  /**
   * Insert a (key, value) pair into the corresponding leaf node. It is the responsibility of the
   * caller to make sure that the node @c node is the correct node. The key is inserted at the last
   * position.
   *
   * @param node the node where the key-value pair is inserted.
   * @param key the key to be inserted
   * @param val the value associated with the key
   * @param splitInfo information about a possible split of the node
   * @return whether a split occured
   */
  bool insertInLeafNode(const pptr<LeafNode> &node, const KeyType &key, const ValueType &val,
                        SplitInfo *splitInfo) {
    const auto pos = lookupPositionInLeafNode(node, key);
    auto &nodeRef = *node;
    if (pos < M) {
      /// handle insert of duplicates
      nodeRef.values.get_rw()[pos] = val;
      return false;
    }
917
    const auto u = nodeRef.bits.get_ro().getFreeZero();
918
919
920
921
922
923
924
925
926
    if (u == M) {
      /// split the node
      splitLeafNode(node, splitInfo);
      auto &splitRef = *splitInfo;
      const auto &sibling = splitRef.rightChild.leaf;
      const auto &sibRef = *sibling;

      /// insert the new entry
      if (key > splitRef.key) {
927
        insertInLeafNodeAtPosition(sibling, sibRef.bits.get_ro().getFreeZero(), key, val);
928
      } else {
929
930
        if (key >
            nodeRef.keys.get_ro()[findMaxKeyPos(nodeRef.keys.get_ro(), nodeRef.bits.get_ro())]) {
931
          /// Special case: new key would be the middle, thus must be right
932
          insertInLeafNodeAtPosition(sibling, sibRef.bits.get_ro().getFreeZero(), key, val);
933
934
          splitRef.key = key;
        } else {
935
          insertInLeafNodeAtPosition(node, nodeRef.bits.get_ro().getFreeZero(), key, val);
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
        }
      }
      /// inform the caller about the split
      return true;
    } else {
      /// otherwise, we can simply insert the new entry at the last position
      insertInLeafNodeAtPosition(node, u, key, val);
    }
    return false;
  }

  /**
   * Split the given leaf node @c node in the middle and move half of the keys/children to the new
   * sibling node.
   *
   * @param node the leaf node to be split
   * @param splitInfo[out] information about the split
   */
  void splitLeafNode(const pptr<LeafNode> &node, SplitInfo *splitInfo) {
    auto &nodeRef = *node;

    /// determine the split position by finding the median in unsorted array of keys
958
    auto [b, splitPos] = findSplitKey<KeyType, M>(nodeRef.keys.get_ro().data());
959
960
961
962
963
964
965
    const auto &splitKey = nodeRef.keys.get_ro()[splitPos];

    /// copy leaf with flipped bitmap
    const auto sibling = newLeafNode(node);
    auto &sibRef = *sibling;
    nodeRef.bits.get_rw() = b;
    sibRef.bits.get_rw() = b.flip();
966
    PersistEmulation::writeBytes<sizeof(LeafNode) + ((2 * M + 7) >> 3)>();
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985

    /// Alternative: move instead of complete copy
    /*
    const auto sibling = newLeafNode();
    auto &sibRef = *sibling;
    auto &sibKeys = sibRef.keys.get_rw();
    auto &sibValues = sibRef.values.get_rw();
    auto &sibBits = sibRef.bits.get_rw();
    auto &nodeBits = nodeRef.bits.get_rw();
    auto j = 0u;
    for(auto i = 0u; i < M; ++i) {
      if(nodeRef.keys.get_ro()[i] >= splitKey) {
        sibKeys[j] = nodeRef.keys.get_ro()[i];
        sibValues[j] = nodeRef.values.get_ro()[i];
        sibBits.set(j);
        nodeBits.reset(i);
        j++;
      }
    }
986
987
    PersistEmulation::writeBytes(j * (sizeof(KeyType) + sizeof(ValueType)) + ((j*2+7)>>3)); /// j
    entries + j*2 bits
988
989
990
    */

    /// setup the list of leaf nodes
991
    if (nodeRef.nextLeaf != nullptr) {
992
993
      sibRef.nextLeaf = nodeRef.nextLeaf;
      nodeRef.nextLeaf->prevLeaf = sibling;
994
      PersistEmulation::writeBytes<16 * 2>();
995
996
997
    }
    nodeRef.nextLeaf = sibling;
    sibRef.prevLeaf = node;
998
    PersistEmulation::writeBytes<16 * 2>();
999
1000

    /// set split information