BitPBPTree.hpp 45.5 KB
Newer Older
1
/*
2
 * Copyright (C) 2017-2020 DBIS Group - TU Ilmenau, All Rights Reserved.
3
 *
4
 * This file is part of our NVM-based Data Structures repository.
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 *
 * This program is free software: you can redistribute it and/or modify it under the terms of the
 * GNU General Public License as published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 * See the GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along with this program.
 * If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef DBIS_BitPBPTree_hpp_
#define DBIS_BitPBPTree_hpp_

21
22
#include <libpmemobj/ctl.h>

23
24
25
26
27
28
29
#include <array>
#include <iostream>
#include <libpmemobj++/make_persistent.hpp>
#include <libpmemobj++/p.hpp>
#include <libpmemobj++/persistent_ptr.hpp>
#include <libpmemobj++/transaction.hpp>
#include <libpmemobj++/utils.hpp>
30

31
#include "config.h"
32
#include "utils/Bitmap.hpp"
33
#include "utils/PersistEmulation.hpp"
34
#include "utils/SearchFunctions.hpp"
35

36
namespace dbis::pbptrees {
37

38
using pmem::obj::allocation_flag;
39
40
41
42
43
using pmem::obj::delete_persistent;
using pmem::obj::make_persistent;
using pmem::obj::p;
using pmem::obj::persistent_ptr;
using pmem::obj::transaction;
44
template <typename Object>
45
46
47
using pptr = persistent_ptr<Object>;

/**
48
49
50
51
52
53
54
55
 * A persistent memory implementation of a B+ tree.
 *
 * @tparam KeyType the data type of the key
 * @tparam ValueType the data type of the values associated with the key
 * @tparam N the maximum number of keys on a branch node
 * @tparam M the maximum number of keys on a leaf node
 */
template <typename KeyType, typename ValueType, size_t N, size_t M>
56
57
58
class BitPBPTree {
  /// we need at least two keys on a branch node to be able to split
  static_assert(N > 2, "number of branch keys has to be >2.");
59
60
  /// we need an even order for branch nodes to be able to merge
  static_assert(N % 2 == 0, "order of branch nodes must be even.");
61
62
63
64
  /// we need at least one key on a leaf node
  static_assert(M > 0, "number of leaf keys should be >0.");

#ifndef UNIT_TESTS
65
 private:
66
#else
67
 public:
68
69
70
71
72
73
74
#endif

  /// Forward declarations
  struct LeafNode;
  struct BranchNode;

  struct Node {
75
    Node() : tag(BLANK){};
76

77
78
    explicit Node(const pptr<LeafNode> &leaf_) : tag(LEAF), leaf(leaf_){};
    explicit Node(const pptr<BranchNode> &branch_) : tag(BRANCH), branch(branch_){};
79
80
81
82
83
84
    Node(const Node &other) { copy(other); };

    void copy(const Node &other) {
      tag = other.tag;
      switch (tag) {
        case LEAF: {
85
86
          leaf = other.leaf;
          break;
87
88
        }
        case BRANCH: {
89
90
          branch = other.branch;
          break;
91
        }
92
        default:;
93
94
95
      }
    }

96
97
98
99
    Node &operator=(const Node &other) {
      copy(other);
      return *this;
    }
100
    Node &operator=(const pptr<LeafNode> &leaf_) {
101
102
103
      tag = LEAF;
      leaf = leaf_;
      return *this;
104
105
    }
    Node &operator=(const pptr<BranchNode> &branch_) {
106
107
108
      tag = BRANCH;
      branch = branch_;
      return *this;
109
110
    }

111
    enum NodeType { BLANK, LEAF, BRANCH } tag;
112
113
114
115
116
117
118
119
120
121
122
123
124

    union {
      pptr<LeafNode> leaf;
      pptr<BranchNode> branch;
    };
  };

  /* -------------------------------------------------------------------------------------------- */

  /**
   * A structure for representing a leaf node of a B+ tree.
   */
  struct alignas(64) LeafNode {
125
126
127
    static constexpr auto NUM_KEYS = M;
    using KEY_TYPE = KeyType;

128
129
130
131
132
    /**
     * Constructor for creating a new empty leaf node.
     */
    LeafNode() : nextLeaf(nullptr), prevLeaf(nullptr) {}

133
134
135
    static constexpr auto BitsetSize = ((M + 63) / 64) * 8;  ///< number * size of words
    static constexpr auto PaddingSize = (64 - (BitsetSize + 32) % 64) % 64;

136
    p<dbis::Bitmap<M>> bits;             ///< bitmap for valid entries
137
138
139
140
141
    pptr<LeafNode> nextLeaf;             ///< pointer to the subsequent sibling
    pptr<LeafNode> prevLeaf;             ///< pointer to the preceeding sibling
    char padding[PaddingSize];           ///< padding to align keys to 64 bytes
    p<std::array<KeyType, M>> keys;      ///< the actual keys
    p<std::array<ValueType, M>> values;  ///< the actual values
142
143
144
145
146
147
148
  };

  /**
   * A structure for representing an branch node (branch node) of a B+ tree.
   * The rightmost child is always at position N.
   */
  struct alignas(64) BranchNode {
149
150
151
    static constexpr auto NUM_KEYS = N;
    using KEY_TYPE = KeyType;

152
153
154
    /**
     * Constructor for creating a new empty branch node.
     */
155
    BranchNode() {}
156

157
158
159
    p<dbis::Bitmap<N>> bits;              ///< bitmap for valid entries
    p<std::array<KeyType, N>> keys;       ///< the actual keys
    p<std::array<Node, N + 1>> children;  ///< pointers to child nodes (BranchNode or LeafNode)
160
161
162
163
  };

  /* -------------------------------------------------------------------------------------------- */

Philipp Götze's avatar
Philipp Götze committed
164
  /**
165
166
167
   * A structure for passing information about a node split to the caller.
   */
  struct SplitInfo {
168
169
170
    KeyType key;      ///< the key at which the node was split
    Node leftChild;   ///< the resulting lhs child node
    Node rightChild;  ///< the resulting rhs child node
171
172
  };

173
174
  static constexpr pobj_alloc_class_desc AllocClass{256, 64, 1, POBJ_HEADER_COMPACT};
  pobj_alloc_class_desc alloc_class;
175
176
177
178
179
  p<unsigned int> depth; /**< the depth of the tree, i.e. the number of levels
                              (0 => rootNode is LeafNode) */
  Node rootNode; /**< pointer to the root node (an instance of @c LeafNode or @c BranchNode).
                      This pointer is never @c nullptr. */

180
 public:
181
182
183
184
185
186
187
  /**
   * Iterator for iterating over the leaf nodes.
   */
  class iterator {
    pptr<LeafNode> currentNode;
    size_t currentPosition;

188
   public:
189
190
191
192
193
194
195
196
197
198
    iterator() : currentNode(nullptr), currentPosition(0) {}

    iterator(const Node &root, size_t d) {
      /// traverse to left-most key
      auto node = root;
      while (d-- > 0) {
        node = node.branch->children.get_ro()[0];
      }
      currentNode = node.leaf;
      currentPosition = 0;
199
      const auto &nodeBits = currentNode->bits.get_ro();
200
      while (!nodeBits.test(currentPosition)) ++currentPosition;
201
202
203
    }

    iterator &operator++() {
204
      if (currentPosition >= M - 1) {
205
206
207
        currentNode = currentNode->nextLeaf;
        currentPosition = 0;
        if (currentNode == nullptr) return *this;
208
        const auto &nodeBits = currentNode->bits.get_ro();
209
        while (!nodeBits.test(currentPosition)) {
210
211
212
213
214
          ++currentPosition;
        };
      } else {
        if (!currentNode->bits.get_ro().test(++currentPosition)) ++(*this);
      }
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
      return *this;
    }

    iterator operator++(int) {
      iterator retval = *this;
      ++(*this);
      return retval;
    }

    bool operator==(iterator other) const {
      return (currentNode == other.currentNode && currentPosition == other.currentPosition);
    }

    bool operator!=(iterator other) const { return !(*this == other); }

    std::pair<KeyType, ValueType> operator*() {
231
232
233
      const auto &nodeRef = *currentNode;
      return std::make_pair(nodeRef.keys.get_ro()[currentPosition],
                            nodeRef.values.get_ro()[currentPosition]);
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
    }

    /// iterator traits
    using difference_type = long;
    using value_type = std::pair<KeyType, ValueType>;
    using pointer = const std::pair<KeyType, ValueType> *;
    using reference = const std::pair<KeyType, ValueType> &;
    using iterator_category = std::forward_iterator_tag;
  };

  iterator begin() { return iterator(rootNode, depth); }

  iterator end() { return iterator(); }

  /**
   * Alias for a function passed to the scan method.
   */
  using ScanFunc = std::function<void(const KeyType &key, const ValueType &val)>;

  /**
   * Constructor for creating a new B+ tree.
   */
256
  explicit BitPBPTree(struct pobj_alloc_class_desc _alloc) : depth(0), alloc_class(_alloc) {
257
    // BitPBPTree() : depth(0) {
258
    rootNode = newLeafNode();
259
260
    LOG("created new tree with sizeof(BranchNode) = "
        << sizeof(BranchNode) << ", sizeof(LeafNode) = " << sizeof(LeafNode));
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
  }

  /**
   * Destructor for the B+ tree.
   */
  ~BitPBPTree() {}

  /**
   * Insert an element (a key-value pair) into the B+ tree. If the key @c key already exists, the
   * corresponding value is replaced by @c val.
   *
   * @param key the key of the element to be inserted
   * @param val the value that is associated with the key
   */
  void insert(const KeyType &key, const ValueType &val) {
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
    SplitInfo splitInfo;

    bool wasSplit = false;
    if (depth == 0) {
      /// the root node is a leaf node
      auto n = rootNode.leaf;
      wasSplit = insertInLeafNode(n, key, val, &splitInfo);
    } else {
      /// the root node is a branch node
      auto n = rootNode.branch;
      wasSplit = insertInBranchNode(n, depth, key, val, &splitInfo);
    }
    if (wasSplit) {
      /// we had an overflow in the node and therefore the node is split
      auto root = newBranchNode();
      auto &rootRef = *root;
      auto &rootChilds = rootRef.children.get_rw();
      rootRef.keys.get_rw()[0] = splitInfo.key;
      rootChilds[0] = splitInfo.leftChild;
      rootChilds[N] = splitInfo.rightChild;
      rootRef.bits.get_rw().set(0);
      rootNode.branch = root;
      ++depth.get_rw();
    }
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
  }

  /**
   * Find the given @c key in the B+ tree and if found return the corresponding value.
   *
   * @param key the key we are looking for
   * @param[out] val a pointer to memory where the value is stored if the key was found
   * @return true if the key was found, false otherwise
   */
  bool lookup(const KeyType &key, ValueType *val) const {
    assert(val != nullptr);

    const auto leafNode = findLeafNode(key);
    const auto pos = lookupPositionInLeafNode(leafNode, key);
    if (pos < M) {
      /// we found it
      *val = leafNode->values.get_ro()[pos];
      return true;
    }
    return false;
  }

  /**
   * Delete the entry with the given key @c key from the tree.
   *
   * @param key the key of the entry to be deleted
   * @return true if the key was found and deleted
   */
  bool erase(const KeyType &key) {
    bool result;
330
331
332
333
334
335
336
337
338
339
    if (depth == 0) {
      /// special case: the root node is a leaf node and there is no need to handle underflow
      auto node = rootNode.leaf;
      assert(node != nullptr);
      result = eraseFromLeafNode(node, key);
    } else {
      auto node = rootNode.branch;
      assert(node != nullptr);
      result = eraseFromBranchNode(node, depth, key);
    }
340
341
342
343
344
345
346
347
348
349
350
351
    return result;
  }

  /**
   * Nothing to do as everything is persistent and failure-atomic(?).
   */
  void recover() { return; }

  /**
   * Print the structure and content of the B+ tree to stdout.
   */
  void print() const {
352
353
354
355
    if (depth == 0)
      printLeafNode(0u, rootNode.leaf);
    else
      printBranchNode(0u, rootNode.branch);
356
357
358
359
360
361
362
363
364
365
366
367
  }

  /**
   * Perform a scan over all key-value pairs stored in the B+ tree.
   * For each entry the given function @func is called.
   *
   * @param func the function called for each entry
   */
  void scan(ScanFunc func) const {
    /// we traverse to the leftmost leaf node
    auto node = rootNode;
    auto d = depth.get_ro();
368
369
    /// as long as we aren't at the leaf level we follow the path down
    while (d-- > 0) node = node.branch->children.get_ro()[0];
370
371
    auto leaf = node.leaf;
    while (leaf != nullptr) {
372
      const auto &leafRef = *leaf;
373
      /// for each key-value pair call func
374
375
376
377
378
379
380
      const auto &leafBits = leafRef.bits.get_ro();
      const auto &leafKeys = leafRef.keys.get_ro();
      const auto &leafValues = leafRef.values.get_ro();
      for (auto i = 0u; i < M; ++i) {
        if (!leafBits.test(i)) continue;
        const auto &key = leafKeys[i];
        const auto &val = leafValues[i];
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
        func(key, val);
      }
      /// move to the next leaf node
      leaf = leafRef.nextLeaf;
    }
  }

  /**
   * Perform a range scan over all elements within the range [minKey, maxKey] and for each element
   * call the given function @c func.
   *
   * @param minKey the lower boundary of the range
   * @param maxKey the upper boundary of the range
   * @param func the function called for each entry
   */
  void scan(const KeyType &minKey, const KeyType &maxKey, ScanFunc func) const {
    auto leaf = findLeafNode(minKey);

    bool higherThanMax = false;
    while (!higherThanMax && leaf != nullptr) {
      /// for each key-value pair within the range call func
402
      const auto &leafRef = *leaf;
403
404
405
406
407
      const auto &leafBits = leafRef.bits.get_ro();
      const auto &leafKeys = leafRef.keys.get_ro();
      const auto &leafValues = leafRef.values.get_ro();
      for (auto i = 0u; i < M; ++i) {
        if (leafBits.test(i)) continue;
408

409
        const auto &key = leafKeys[i];
410
        if (key < minKey) continue;
411
412
413
414
        if (key > maxKey) {
          higherThanMax = true;
          continue;
        };
415

416
        const auto &val = leafValues[i];
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
        func(key, val);
      }
      /// move to the next leaf node
      leaf = leafRef.nextLeaf;
    }
  }

#ifndef UNIT_TESTS
 private:
#endif
  /* -------------------------------------------------------------------------------------------- */
  /*                                     DELETE AT LEAF LEVEL                                     */
  /* -------------------------------------------------------------------------------------------- */

  /**
   * Delete the element with the given key from the given leaf node.
   *
   * @param node the leaf node from which the element is deleted
   * @param key the key of the element to be deleted
   * @return true of the element was deleted
   */
  bool eraseFromLeafNode(const pptr<LeafNode> &node, const KeyType &key) {
    auto pos = lookupPositionInLeafNode(node, key);
440
441
442
443
444
445
446
447
448
449
450
451
452
    return eraseFromLeafNodeAtPosition(node, pos, key);
  }

  /**
   * Delete the element with the given position and key from the given leaf node.
   *
   * @param node the leaf node from which the element is deleted
   * @param pos the position of the key in the node
   * @param key the key of the element to be deleted
   * @return true of the element was deleted
   */
  bool eraseFromLeafNodeAtPosition(const pptr<LeafNode> &node, const unsigned int pos,
                                   const KeyType &key) {
453
    if (pos < M) {
454
      /// simply reset bit
455
      node->bits.get_rw().reset(pos);
456
      PersistEmulation::writeBytes<1>();
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
      return true;
    }
    return false;
  }

  /**
   * Handle the case that during a delete operation an underflow at node @c leaf occured. If
   * possible this is handled
   * (1) by rebalancing the elements among the leaf node and one of its siblings
   * (2) if not possible by merging with one of its siblings.
   *
   * @param node the parent node of the node where the underflow occured
   * @param pos the position of the child node @leaf in the @c children array of the branch node
   * @param leaf the node at which the underflow occured
   */
  void underflowAtLeafLevel(const pptr<BranchNode> &node, const unsigned int pos,
473
                            pptr<LeafNode> &leaf) {
474
475
476
477
478
479
480
481
482
483
484
485
486
    assert(pos <= N);
    auto &nodeRef = *node;
    auto &leafRef = *leaf;
    auto &nodeKeys = nodeRef.keys.get_rw();
    auto prevNumKeys = 0u;
    constexpr auto middle = (M + 1) / 2;

    /// 1. we check whether we can rebalance with one of the siblings but only if both nodes have
    ///    the same direct parent
    if (pos > 0 && (prevNumKeys = leafRef.prevLeaf->bits.get_ro().count()) > middle) {
      /// we have a sibling at the left for rebalancing the keys
      balanceLeafNodes(leafRef.prevLeaf, leaf);
      const auto newMin =
487
          leafRef.keys.get_ro()[findMinKeyPos(leafRef.keys.get_ro(), leafRef.bits.get_ro())];
488
      const auto prevPos =
489
          findMinKeyPosGreaterThan(leafRef.keys.get_ro(), leafRef.bits.get_ro(), newMin);
490
491
492
493
494
495
      nodeKeys[prevPos] = newMin;
    } else if (pos < N && leafRef.nextLeaf->bits.get_ro().count() > middle) {
      /// we have a sibling at the right for rebalancing the keys
      balanceLeafNodes(leafRef.nextLeaf, leaf);
      const auto &nextLeaf = *leafRef.nextLeaf;
      nodeKeys[pos] =
496
          nextLeaf.keys.get_ro()[findMinKeyPos(nextLeaf.keys.get_ro(), nextLeaf.bits.get_ro())];
497
498
499
500
501
502
503
    } else {
      /// 2. if this fails we have to merge two leaf nodes but only if both nodes have the same
      ///    direct parent
      pptr<LeafNode> survivor = nullptr;
      auto &nodeChilds = nodeRef.children.get_rw();
      auto &nodeBits = nodeRef.bits.get_rw();

504
      if (findMinKeyPos(nodeKeys, nodeBits) != pos && prevNumKeys <= middle) {
505
506
507
508
        /// merge left
        survivor = mergeLeafNodes(leafRef.prevLeaf, leaf);
        deleteLeafNode(leaf);
        /// move to next left slot
509
        const auto prevPos = (pos == N) ? findMaxKeyPos(nodeKeys, nodeBits)
510
                                        :  ///< we need a new rightmost node
511
                                 findMaxKeyPosSmallerThan(nodeKeys, nodeBits, nodeKeys[pos]);
512
513
514
515
516
517
518
        nodeChilds[pos] = nodeChilds[prevPos];
        nodeBits.reset(prevPos);
      } else if (pos < N && leafRef.nextLeaf->bits.get_ro().count() <= middle) {
        /// merge right
        survivor = mergeLeafNodes(leaf, leafRef.nextLeaf);
        deleteLeafNode(leafRef.nextLeaf);
        /// move to next right slot
519
        const auto nextPos = findMinKeyPosGreaterThan(nodeKeys, nodeBits, nodeKeys[pos]);
520
521
        nodeChilds[nextPos] = nodeChilds[pos];
        nodeBits.reset(pos);
522
      } else {
523
524
525
526
527
528
529
530
        /// this shouldn't happen?!
        assert(false);
      }
      if (nodeBits.count() == 0) {
        /// This is a special case that happens only if the current node is the root node. Now, we
        /// have to replace the branch root node by a leaf node.
        rootNode = survivor;
        --depth.get_rw();
531
532
      }
    }
533
  }
534
535
536
537
538
539
540
541

  /**
   * Merge two leaf nodes by moving all elements from @c node2 to @c node1.
   *
   * @param node1 the target node of the merge
   * @param node2 the source node
   * @return the merged node (always @c node1)
   */
542
  pptr<LeafNode> mergeLeafNodes(const pptr<LeafNode> &node1, const pptr<LeafNode> &node2) {
543
544
545
546
    assert(node1 != nullptr);
    assert(node2 != nullptr);
    auto &node1Ref = *node1;
    auto &node2Ref = *node2;
547
548
    auto &node1Bits = node1Ref.bits.get_rw();
    const auto &node2Bits = node2Ref.bits.get_ro();
549
550
    const auto n2numKeys = node2Bits.count();
    assert(node1Bits.count() + n2numKeys <= M);
551
552

    /// we move all keys/values from node2 to node1
553
554
555
556
557
558
    auto &node1Keys = node1Ref.keys.get_rw();
    auto &node1Values = node1Ref.values.get_rw();
    const auto &node2Keys = node2Ref.keys.get_ro();
    const auto &node2Values = node2Ref.values.get_ro();
    for (auto i = 0u; i < M; ++i) {
      if (node2Bits.test(i)) {
559
        const auto u = node1Bits.getFreeZero();
560
561
562
        node1Keys[u] = node2Keys[i];
        node1Values[u] = node2Values[i];
        node1Bits.set(u);
563
564
565
      }
    }
    node1Ref.nextLeaf = node2Ref.nextLeaf;
566
567
568
569
570
571
572
573
    if (node2Ref.nextLeaf != nullptr) {
      node2Ref.nextLeaf->prevLeaf = node1;
      PersistEmulation::writeBytes<16>();
    }
    PersistEmulation::writeBytes(
        n2numKeys * (sizeof(KeyType) + sizeof(ValueType)) +  ///< moved keys, vals
        ((n2numKeys + 7) >> 3) + 16                          ///< ceiled bits + nextpointer
    );
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
    return node1;
  }

  /**
   * Redistribute (key, value) pairs from the leaf node @c donor to the leaf node @c receiver such
   * that both nodes have approx. the same number of elements. This method is used in case of an
   * underflow situation of a leaf node.
   *
   * @param donor the leaf node from which the elements are taken
   * @param receiver the sibling leaf node getting the elements from @c donor
   */
  void balanceLeafNodes(const pptr<LeafNode> &donor, const pptr<LeafNode> &receiver) {
    auto &donorRef = *donor;
    auto &receiverRef = *receiver;
    const auto dNumKeys = donorRef.bits.get_ro().count();
    const auto rNumKeys = receiverRef.bits.get_ro().count();
    assert(dNumKeys > rNumKeys);

    const auto balancedNum = (dNumKeys + rNumKeys) / 2;
    const auto toMove = dNumKeys - balancedNum;
    if (toMove == 0) return;

596
597
598
599
600
601
602
603
    auto &receiverKeys = receiverRef.keys.get_rw();
    auto &receiverValues = receiverRef.values.get_rw();
    auto &receiverBits = receiverRef.bits.get_rw();
    const auto &donorKeys = donorRef.keys.get_ro();
    const auto &donorValues = donorRef.values.get_ro();
    auto &donorBits = donorRef.bits.get_rw();

    if (donorKeys[0] < receiverKeys[0]) {
604
      /// move from one node to a node with larger keys
605
      for (auto i = 0u; i < toMove; ++i) {
606
        const auto max = findMaxKeyPos(donorKeys, donorBits);
607
        const auto u = receiverBits.getFreeZero();
608
        /// move the donor's maximum key to the receiver
609
610
611
612
        receiverKeys[u] = donorKeys[max];
        receiverValues[u] = donorValues[max];
        receiverBits.set(u);
        donorBits.reset(max);
613
614
615
      }
    } else {
      /// move from one node to a node with smaller keys
616
      for (auto i = 0u; i < toMove; ++i) {
617
        const auto min = findMinKeyPos(donorKeys, donorBits);
618
        const auto u = receiverBits.getFreeZero();
619
        /// move the donor's minimum key to the receiver
620
621
622
623
        receiverKeys[u] = donorKeys[min];
        receiverValues[u] = donorValues[min];
        receiverBits.set(u);
        donorBits.reset(min);
624
625
      }
    }
626
627
628
629
    PersistEmulation::writeBytes(
        toMove * (sizeof(KeyType) + sizeof(ValueType)) +  ///< move keys, vals
        ((2 * toMove + 7) >> 3)                           ///< (re)set ceiled bits
    );
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
  }

  /* -------------------------------------------------------------------------------------------- */
  /*                                     DELETE AT INNER LEVEL                                    */
  /* -------------------------------------------------------------------------------------------- */
  /**
   * Delete an entry from the tree by recursively going down to the leaf level and handling the
   * underflows.
   *
   * @param node the current branch node
   * @param d the current depth of the traversal
   * @param key the key to be deleted
   * @return true if the entry was deleted
   */
  bool eraseFromBranchNode(const pptr<BranchNode> &node, const unsigned int d, const KeyType &key) {
    assert(d >= 1);
    const auto &nodeRef = *node;
    bool deleted = false;
    /// try to find the branch
    auto pos = lookupPositionInBranchNode(node, key);
650
    auto n = nodeRef.children.get_ro()[pos];
651
652
    if (d == 1) {
      /// the next level is the leaf level
653
      auto &leaf = n.leaf;
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
      deleted = eraseFromLeafNode(leaf, key);
      constexpr auto middle = (M + 1) / 2;
      if (leaf->bits.get_ro().count() < middle) {
        /// handle underflow
        underflowAtLeafLevel(node, pos, leaf);
      }
    } else {
      auto child = n.branch;
      deleted = eraseFromBranchNode(child, d - 1, key);

      pos = lookupPositionInBranchNode(node, key);
      constexpr auto middle = (N + 1) / 2;
      if (child->bits.get_ro().count() < middle) {
        /// handle underflow
        child = underflowAtBranchLevel(node, pos, child);
        if (d == depth && nodeRef.bits.get_ro().count() == 0) {
          /// special case: the root node is empty now
          rootNode = child;
672
          --depth.get_rw();
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
        }
      }
    }
    return deleted;
  }

  /**
   * Handle the case that during a delete operation a underflow at node @c child occured where
   * @c node is the parent node. If possible this is handled
   * (1) by rebalancing the elements among the node @c child and one of its siblings
   * (2) if not possible by merging with one of its siblings.
   *
   * @param node the parent node of the node where the underflow occured
   * @param pos the position of the child node @child in the @c children array of the branch node
   * @param child the node at which the underflow occured
   * @return the (possibly new) child node (in case of a merge)
   */
  pptr<BranchNode> underflowAtBranchLevel(const pptr<BranchNode> &node, const unsigned int pos,
691
                                          pptr<BranchNode> &child) {
692
693
    assert(node != nullptr);
    assert(child != nullptr);
Philipp Götze's avatar
Philipp Götze committed
694
    auto &nodeRef = *node;
695
696
697
    const auto &nodeKeys = nodeRef.keys.get_ro();
    const auto &nodeChilds = nodeRef.children.get_ro();
    auto &nodeBits = nodeRef.bits.get_rw();
698
    const auto nMinKeyPos = findMinKeyPos(nodeKeys, nodeBits);
699
    const auto prevPos = findMaxKeyPosSmallerThan(nodeKeys, nodeBits, nodeKeys[pos]);  // could be N
700
    const auto prevNumKeys = nodeChilds[prevPos].branch->bits.get_ro().count();
701
    const auto nextPos = findMinKeyPosGreaterThan(nodeKeys, nodeBits, nodeKeys[pos]);
702
    const auto nextNumKeys = nodeChilds[nextPos].branch->bits.get_ro().count();
703
704
    constexpr auto middle = (N + 1) / 2;

Philipp Götze's avatar
Philipp Götze committed
705
    /// 1. we check whether we can rebalance with one of the siblings
706
    if (nMinKeyPos != pos && prevNumKeys > middle) {
707
      /// we have a sibling at the left for rebalancing the keys
708
709
      assert(prevPos != N);
      const auto sibling = nodeChilds[prevPos].branch;
710
      balanceBranchNodes(sibling, child, node, pos);
711
712
      return child;
    } else if (pos < N && nextNumKeys > middle) {
713
      /// we have a sibling at the right for rebalancing the keys
714
715
716
      const auto sibling = nodeChilds[nextPos].branch;
      balanceBranchNodes(sibling, child, node, pos);
      return child;
717
718
    }
    /// 2. if this fails we have to merge two branch nodes
719
720
721
722
723
724
    if (nMinKeyPos != pos && prevNumKeys <= middle) {
      /// merge to left
      auto &lSibling = nodeChilds[prevPos].branch;
      mergeBranchNodes(lSibling, nodeKeys[pos], child);
      deleteBranchNode(child);
      nodeBits.reset(pos);
725
      if (pos == N) nodeRef.children.get_rw()[N] = child;  ///< new rightmost child
726
727
728
729
730
731
732
      return lSibling;
    } else if (pos < N && nextNumKeys <= middle) {
      /// merge from right
      auto &rSibling = nodeRef.children.get_rw()[nextPos].branch;
      mergeBranchNodes(child, nodeKeys[pos], rSibling);
      deleteBranchNode(rSibling);
      nodeBits.reset(pos);
733
      if (pos == findMaxKeyPos(nodeKeys, nodeBits))
734
        nodeRef.children.get_rw()[N] = child;  ///< new rightmost child
735
      return child;
736
    } else {
737
      assert(false);  ///< shouldn't happen
738
739
      return child;
    }
740
  }
741

742
743
744
745
746
747
748
749
750
751
752
753
  /**
   * Merge two branch nodes by moving all keys/children from @c node to @c sibling and put the key
   * @c key from the parent node in the middle. The node @c node should be deleted by the caller.
   *
   * @param sibling the left sibling node which receives all keys/children
   * @param key the key from the parent node that is between sibling and node
   * @param node the node from which we move all keys/children
   */
  void mergeBranchNodes(const pptr<BranchNode> &sibling, const KeyType &key,
                        const pptr<BranchNode> &node) {
    assert(sibling != nullptr);
    assert(node != nullptr);
754

755
756
757
758
759
760
761
762
763
    const auto &nodeRef = *node;
    auto &sibRef = *sibling;
    const auto &nodeKeys = nodeRef.keys.get_ro();
    const auto &nodeChilds = nodeRef.children.get_ro();
    const auto &nodeBits = nodeRef.bits.get_ro();
    auto &sibKeys = sibRef.keys.get_rw();
    auto &sibChilds = sibRef.children.get_rw();
    auto &sibBits = sibRef.bits.get_rw();

764
765
    assert(key <= nodeKeys[findMinKeyPos(nodeKeys, nodeBits)]);
    assert(sibKeys[findMaxKeyPos(sibKeys, sibBits)] < key);
766

767
    auto u = sibBits.getFreeZero();
768
769
770
771
772
    sibKeys[u] = key;
    sibChilds[u] = sibChilds[N];
    sibBits.set(u);
    for (auto i = 0u; i < M; ++i) {
      if (nodeBits.test(i)) {
773
        u = sibBits.getFreeZero();
774
775
776
777
778
779
        sibKeys[u] = nodeKeys[i];
        sibChilds[u] = nodeChilds[i];
        sibBits.set(u);
      }
    }
    sibChilds[N] = nodeChilds[N];
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
  }

  /**
   * Rebalance two branch nodes by moving some key-children pairs from the node @c donor to the node
   * @receiver via the parent node @parent. The position of the key between the two nodes is denoted
   * by @c pos.
   *
   * @param donor the branch node from which the elements are taken
   * @param receiver the sibling branch node getting the elements from @c donor
   * @param parent the parent node of @c donor and @c receiver
   * @param pos the position of the key in node @c parent that lies between @c donor and @c receiver
   */
  void balanceBranchNodes(const pptr<BranchNode> &donor, const pptr<BranchNode> &receiver,
                          const pptr<BranchNode> &parent, const unsigned int pos) {
    auto &donorRef = *donor;
    auto &receiverRef = *receiver;
    auto &parentRef = *parent;
797
798
799
800
801
802
803
804
805
    const auto &donorKeys = donorRef.keys.get_ro();
    auto &donorChilds = donorRef.children.get_rw();
    auto &donorBits = donorRef.bits.get_rw();
    auto &receiverKeys = receiverRef.keys.get_rw();
    auto &receiverChilds = receiverRef.children.get_rw();
    auto &receiverBits = receiverRef.bits.get_rw();
    auto &parentKeys = parentRef.keys.get_rw();
    const auto dNumKeys = donorBits.count();
    const auto rNumKeys = receiverBits.count();
806
807
808
809
810
811
812
    assert(dNumKeys > rNumKeys);

    const auto balancedNum = (dNumKeys + rNumKeys) / 2;
    const auto toMove = dNumKeys - balancedNum;
    if (toMove == 0) return;

    /// 1. move from one node to a node with larger keys
813
    if (donorKeys[donorBits.getFirstSet()] < receiverKeys[receiverBits.getFirstSet()]) {
814
      /// 1.1. copy parent key and rightmost child from donor
815
      auto u = receiverBits.getFreeZero();
816
817
818
      receiverKeys[u] = parentKeys[pos];
      receiverChilds[u] = donorChilds[N];
      receiverBits.set(u);
819
      /// 1.2. move toMove-1 keys/children from donor to receiver
820
      for (auto i = 1u; i < toMove; ++i) {
821
        const auto max = findMaxKeyPos(donorKeys, donorBits);
822
        u = receiverBits.getFreeZero();
823
824
825
826
        receiverKeys[u] = donorKeys[max];
        receiverChilds[u] = donorChilds[max];
        receiverBits.set(u);
        donorBits.reset(max);
827
828
      }
      /// 1.3 set donors new rightmost child and new parent key
829
      const auto dPos = findMaxKeyPos(donorKeys, donorBits);
830
831
832
      donorChilds[N] = donorChilds[dPos];
      parentKeys[pos] = donorKeys[dPos];
      donorBits.reset(dPos);
833

834
      /// 2. move from one node to a node with smaller keys
835
836
    } else {
      /// 2.1. copy parent key and rightmost child of receiver
837
      auto u = receiverBits.getFreeZero();
838
839
840
      receiverKeys[u] = parentKeys[pos];
      receiverChilds[u] = receiverChilds[N];
      receiverBits.set(u);
841
      /// 2.2. move toMove-1 keys/children from donor to receiver
842
      for (auto i = 1u; i < toMove; ++i) {
843
        u = receiverBits.getFreeZero();
844
        const auto min = findMinKeyPos(donorKeys, donorBits);
845
846
847
848
        receiverKeys[u] = donorKeys[min];
        receiverChilds[u] = donorChilds[min];
        receiverBits.set(u);
        donorBits.reset(min);
849
850
      }
      /// 2.3. set receivers new rightmost child and new parent key
851
      const auto dPos = findMinKeyPos(donorKeys, donorBits);
852
853
854
      receiverChilds[N] = donorChilds[dPos];
      parentKeys[pos] = donorKeys[dPos];
      donorBits.reset(dPos);
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
    }
  }

  /* -------------------------------------------------------------------------------------------- */
  /*                                         INSERT                                               */
  /* -------------------------------------------------------------------------------------------- */

  /**
   * Insert a (key, value) pair into the corresponding leaf node. It is the responsibility of the
   * caller to make sure that the node @c node is the correct node. The key is inserted at the last
   * position.
   *
   * @param node the node where the key-value pair is inserted.
   * @param key the key to be inserted
   * @param val the value associated with the key
   * @param splitInfo information about a possible split of the node
   * @return whether a split occured
   */
  bool insertInLeafNode(const pptr<LeafNode> &node, const KeyType &key, const ValueType &val,
                        SplitInfo *splitInfo) {
    const auto pos = lookupPositionInLeafNode(node, key);
    auto &nodeRef = *node;
    if (pos < M) {
      /// handle insert of duplicates
      nodeRef.values.get_rw()[pos] = val;
      return false;
    }
882
    const auto u = nodeRef.bits.get_ro().getFreeZero();
883
884
885
886
    if (u == M) {
      /// split the node
      splitLeafNode(node, splitInfo);
      auto &splitRef = *splitInfo;
887
888
      const auto &sibling = splitRef.rightChild.leaf;
      const auto &sibRef = *sibling;
889
890

      /// insert the new entry
891
      if (key > splitRef.key) {
892
        insertInLeafNodeAtPosition(sibling, sibRef.bits.get_ro().getFreeZero(), key, val);
893
      } else {
894
895
        if (key >
            nodeRef.keys.get_ro()[findMaxKeyPos(nodeRef.keys.get_ro(), nodeRef.bits.get_ro())]) {
896
          /// Special case: new key would be the middle, thus must be right
897
          insertInLeafNodeAtPosition(sibling, sibRef.bits.get_ro().getFreeZero(), key, val);
898
899
          splitRef.key = key;
        } else {
900
          insertInLeafNodeAtPosition(node, nodeRef.bits.get_ro().getFreeZero(), key, val);
901
902
        }
      }
903
904
905
906
      /// inform the caller about the split
      return true;
    } else {
      /// otherwise, we can simply insert the new entry at the last position
907
      insertInLeafNodeAtPosition(node, u, key, val);
908
909
910
911
    }
    return false;
  }

Philipp Götze's avatar
Philipp Götze committed
912
  /**
913
914
915
916
917
   * Split the given leaf node @c node in the middle and move half of the keys/children to the new
   * sibling node.
   *
   * @param node the leaf node to be split
   * @param splitInfo[out] information about the split
Philipp Götze's avatar
Philipp Götze committed
918
   */
919
920
921
922
  void splitLeafNode(const pptr<LeafNode> &node, SplitInfo *splitInfo) {
    auto &nodeRef = *node;

    /// determine the split position by finding the median in unsorted array of keys
923
924
    const auto [b, splitPos] = findSplitKey<KeyType, M>(nodeRef.keys.get_ro().data());
    const auto &splitKey = nodeRef.keys.get_ro()[splitPos];
925
926

    /// copy leaf with flipped bitmap
927
    /*
928
    const auto sibling = newLeafNode(node);
929
930
931
    auto &sibRef = *sibling;
    nodeRef.bits.get_rw() = b;
    sibRef.bits.get_rw() = b.flip();
932
    PersistEmulation::writeBytes<sizeof(LeafNode) + ((2*M+7)>>3)>();
933
    */
934
935

    /// Alternative: move instead of complete copy
936
    // /*
937
938
939
940
941
942
    const auto sibling = newLeafNode();
    auto &sibRef = *sibling;
    auto &sibKeys = sibRef.keys.get_rw();
    auto &sibValues = sibRef.values.get_rw();
    auto &sibBits = sibRef.bits.get_rw();
    auto &nodeBits = nodeRef.bits.get_rw();
943
    const auto &nodeKeys = nodeRef.keys.get_ro();
944
    auto j = 0u;
945
946
    for (auto i = 0u; i < M; ++i) {
      if (nodeKeys[i] >= splitKey) {
947
948
949
950
951
952
953
        sibKeys[j] = nodeRef.keys.get_ro()[i];
        sibValues[j] = nodeRef.values.get_ro()[i];
        sibBits.set(j);
        nodeBits.reset(i);
        j++;
      }
    }
954
955
    PersistEmulation::writeBytes(j * (sizeof(KeyType) + sizeof(ValueType)) +
                                 ((j * 2 + 7) >> 3));  /// j entries + j*2 bits
956
    // */
957
958

    /// setup the list of leaf nodes
959
    if (nodeRef.nextLeaf != nullptr) {
960
961
      sibRef.nextLeaf = nodeRef.nextLeaf;
      nodeRef.nextLeaf->prevLeaf = sibling;
962
      PersistEmulation::writeBytes<16 * 2>();
963
964
965
    }
    nodeRef.nextLeaf = sibling;
    sibRef.prevLeaf = node;
966
    PersistEmulation::writeBytes<16 * 2>();
967
968
969
970
971
972
973
974
975
976

    /// set split information
    auto &splitInfoRef = *splitInfo;
    splitInfoRef.leftChild = node;
    splitInfoRef.rightChild = sibling;
    splitInfoRef.key = splitKey;
  }

  /**
   * Insert a (key, value) pair at the first free position into the leaf node @c node. The caller
Philipp Götze's avatar
Philipp Götze committed
977
   * has to ensure that there is enough space to insert the element.
978
979
980
981
982
   *
   * @oaram node the leaf node where the element is to be inserted
   * @param key the key of the element
   * @param val the actual value corresponding to the key
   */
983
  void insertInLeafNodeAtPosition(const pptr<LeafNode> &node, const unsigned int pos,
984
                                  const KeyType &key, const ValueType &val) {
985
    assert(pos < M);
986
987
    auto &nodeRef = *node;

988
989
990
991
    /// insert the new entry at this position
    nodeRef.keys.get_rw()[pos] = key;
    nodeRef.values.get_rw()[pos] = val;
    nodeRef.bits.get_rw().set(pos);
992
    PersistEmulation::writeBytes<sizeof(KeyType) + sizeof(ValueType) + 1>();
993
994
995
996
997
998
999
1000
  }

  /**
   * Split the given branch node @c node in the middle and move half of the keys/children to the new
   * sibling node.
   *
   * @param node the branch node to be split
   * @param childSplitKey the key on which the split of the child occured