wBPTree.hpp 49.3 KB
Newer Older
1
/*
2
 * Copyright (C) 2017-2020 DBIS Group - TU Ilmenau, All Rights Reserved.
3
 *
4
 * This file is part of our NVM-based Data Structures repository.
5
 *
6
7
8
 * This program is free software: you can redistribute it and/or modify it under the terms of the
 * GNU General Public License as published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
9
 *
10
11
12
 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 * See the GNU General Public License for more details.
13
 *
14
15
 * You should have received a copy of the GNU General Public License along with this program.
 * If not, see <http://www.gnu.org/licenses/>.
16
17
18
19
20
 */

#ifndef DBIS_wBPTree_hpp_
#define DBIS_wBPTree_hpp_

21
22
#include <libpmemobj/ctl.h>

23
24
25
26
27
#include <array>
#include <iostream>
#include <libpmemobj++/make_persistent.hpp>
#include <libpmemobj++/p.hpp>
#include <libpmemobj++/persistent_ptr.hpp>
28
#include <libpmemobj++/pool.hpp>
29
30
31
32
#include <libpmemobj++/transaction.hpp>
#include <libpmemobj++/utils.hpp>

#include "config.h"
33
#include "utils/Bitmap.hpp"
34
#include "utils/PersistEmulation.hpp"
35
#include "utils/SearchFunctions.hpp"
36

37
namespace dbis::pbptrees {
38

39
using pmem::obj::allocation_flag;
40
41
42
43
using pmem::obj::delete_persistent;
using pmem::obj::make_persistent;
using pmem::obj::p;
using pmem::obj::persistent_ptr;
44
using pmem::obj::pool;
45
using pmem::obj::transaction;
46
template <typename Object>
47
using pptr = persistent_ptr<Object>;
48
49

/**
50
 * A persistent memory implementation of a wBPTree.
51
52
53
54
55
56
 *
 * @tparam KeyType the data type of the key
 * @tparam ValueType the data type of the values associated with the key
 * @tparam N the maximum number of keys on a branch node
 * @tparam M the maximum number of keys on a leaf node
 */
57
template <typename KeyType, typename ValueType, int N, int M>
58
class wBPTree {
59
  /// we need at least two keys on a branch node to be able to split
60
  static_assert(N > 2, "number of branch keys has to be >2.");
61
62
  /// we need an even order for branch nodes to be able to merge
  static_assert(N % 2 == 0, "order of branch nodes must be even.");
63
  /// we need at least one key on a leaf node
64
  static_assert(M > 0, "number of leaf keys should be >0.");
65
66
67
  static_assert(M < 256, "number of leaf keys should be < 256 (slots are only 8 bit).");
  static_assert(N < 256, "number of branch keys should be < 256 (slots are only 8 bit).");

68
#ifndef UNIT_TESTS
69
 private:
70
#else
71
 public:
72
73
#endif

74
  /// Forward declarations
75
76
77
  struct LeafNode;
  struct BranchNode;

78
  struct Node {
79
    Node() : tag(BLANK){};
80

81
    Node(pptr<LeafNode> leaf_) : tag(LEAF), leaf(leaf_){};
82

83
    Node(pptr<BranchNode> branch_) : tag(BRANCH), branch(branch_){};
84

85
    Node(const Node &other) { copy(other); };
86

87
    void copy(const Node &other) throw() {
88
89
90
91
92
93
94
95
96
97
98
      tag = other.tag;

      switch (tag) {
        case LEAF: {
          leaf = other.leaf;
          break;
        }
        case BRANCH: {
          branch = other.branch;
          break;
        }
99
100
        default:
          break;
101
102
103
      }
    }

104
    Node &operator=(Node other) {
105
106
107
108
      copy(other);
      return *this;
    }

109
    enum NodeType { BLANK, LEAF, BRANCH } tag;
110
    union {
111
112
      pptr<LeafNode> leaf;
      pptr<BranchNode> branch;
113
114
115
116
117
118
    };
  };

  /**
   * A structure for representing a leaf node of a B+ tree.
   */
119
  struct alignas(64) LeafNode {
120
121
122
    /**
     * Constructor for creating a new empty leaf node.
     */
123
    LeafNode() : nextLeaf(nullptr), prevLeaf(nullptr) { slot.get_rw()[0] = 0; }
124

125
126
    static constexpr auto SlotSize = ((M + 1 + 7) / 8) * 8;  ///< round to 8 Byte
    static constexpr auto BitsetSize = ((M + 63) / 64) * 8;  ///< number * size of words
127
128
129
    static constexpr auto SearchSize = SlotSize + BitsetSize + 32;
    static constexpr auto PaddingSize = (64 - SearchSize % 64) % 64;

130
131
132
133
134
135
136
    p<std::array<uint8_t, M + 1>> slot;
    p<dbis::Bitmap<M>> bits;             ///< bitmap for valid entries
    pptr<LeafNode> nextLeaf;             ///< pointer to the subsequent sibling
    pptr<LeafNode> prevLeaf;             ///< pointer to the preceeding sibling
    char padding[PaddingSize];           ///< padding to align keys to 64 bytes
    p<std::array<KeyType, M>> keys;      ///< the actual keys
    p<std::array<ValueType, M>> values;  ///< the actual values
137
138
139
140
141
  };

  /**
   * A structure for representing an branch node (branch node) of a B+ tree.
   */
142
  struct alignas(64) BranchNode {
143
144
145
    /**
     * Constructor for creating a new empty branch node.
     */
146
    BranchNode() { slot.get_rw()[0] = 0; }
147

148
149
    static constexpr auto SlotSize = ((N + 1 + 7) / 8) * 8;  ///< round to 8 Byte
    static constexpr auto BitsetSize = ((N + 63) / 64) * 8;  ///< number * size of words
150
151
152
    static constexpr auto SearchSize = SlotSize + BitsetSize;
    static constexpr auto PaddingSize = (64 - SearchSize % 64) % 64;

153
154
155
156
157
    p<std::array<uint8_t, N + 1>> slot;   ///< slot array for indirection, first = num
    p<dbis::Bitmap<N>> bits;              ///< bitmap for valid entries
    char padding[PaddingSize];            ///< padding to align keys to 64 bytes
    p<std::array<KeyType, N>> keys;       ///< the actual keys
    p<std::array<Node, N + 1>> children;  ///< pointers to child nodes (BranchNode or LeafNode)
158
159
160
161
162
  };

  /**
   * Create a new empty leaf node
   */
163
  pptr<LeafNode> newLeafNode() {
164
    auto pop = (pmem::obj::pool_by_vptr(this));
165
    pptr<LeafNode> newNode = nullptr;
166
167
168
169
170
    if (pmemobj_tx_stage() == TX_STAGE_NONE) {
      transaction::run(pop, [&] {
        newNode = make_persistent<LeafNode>(allocation_flag::class_id(alloc_class.class_id));
      });
    } else {
171
      newNode = make_persistent<LeafNode>(allocation_flag::class_id(alloc_class.class_id));
172
    }
173
174
175
    return newNode;
  }

176
  pptr<LeafNode> newLeafNode(const pptr<LeafNode> &other) {
177
    auto pop = pmem::obj::pool_by_vptr(this);
178
    pptr<LeafNode> newNode = nullptr;
179
180
181
    transaction::run(pop, [&] {
      newNode = make_persistent<LeafNode>(allocation_flag::class_id(alloc_class.class_id), *other);
    });
182
183
184
    return newNode;
  }

185
  void deleteLeafNode(pptr<LeafNode> &node) {
186
    auto pop = pmem::obj::pool_by_vptr(this);
187
188
189
190
191
    if (pmemobj_tx_stage() == TX_STAGE_NONE) {
      transaction::run(pop, [&] { delete_persistent<LeafNode>(node); });
    } else {
      delete_persistent<LeafNode>(node);
    }
192
    node = nullptr;
193
194
195
196
197
  }

  /**
   * Create a new empty branch node
   */
198
  pptr<BranchNode> newBranchNode() {
199
    auto pop = pmem::obj::pool_by_vptr(this);
200
    pptr<BranchNode> newNode = nullptr;
201
202
203
204
205
    if (pmemobj_tx_stage() == TX_STAGE_NONE) {
      transaction::run(pop, [&] {
        newNode = make_persistent<BranchNode>(allocation_flag::class_id(alloc_class.class_id));
      });
    } else {
206
      newNode = make_persistent<BranchNode>(allocation_flag::class_id(alloc_class.class_id));
207
    }
208
209
210
    return newNode;
  }

211
  void deleteBranchNode(pptr<BranchNode> &node) {
212
    auto pop = pmem::obj::pool_by_vptr(this);
213
214
215
216
217
    if (pmemobj_tx_stage() == TX_STAGE_NONE) {
      transaction::run(pop, [&] { delete_persistent<BranchNode>(node); });
    } else {
      delete_persistent<BranchNode>(node);
    }
218
    node = nullptr;
219
220
221
  }

  /**
222
   * A structure for passing information about a node split to the caller.
223
224
   */
  struct SplitInfo {
225
226
227
    KeyType key;      ///< the key at which the node was split
    Node leftChild;   ///< the resulting lhs child node
    Node rightChild;  ///< the resulting rhs child node
228
229
  };

230
231
  static constexpr pobj_alloc_class_desc AllocClass{256, 64, 1, POBJ_HEADER_COMPACT};
  pobj_alloc_class_desc alloc_class;
232
233
  p<unsigned int> depth; /**< the depth of the tree, i.e. the number of levels
                              (0 => rootNode is LeafNode) */
234
  Node rootNode;         ///< pointer to the root node
235
236

 public:
237
238
239
240
  /**
   * Typedef for a function passed to the scan method.
   */
  using ScanFunc = std::function<void(const KeyType &key, const ValueType &val)>;
241

242
243
244
245
  /**
   * Iterator for iterating over the leaf nodes
   */
  class iterator {
246
247
    pptr<LeafNode> currentNode;
    size_t currentPosition;
248

249
   public:
250
    iterator() : currentNode(nullptr), currentPosition(0) {}
251
252
    iterator(const Node &root, size_t d) {
      /// traverse to left-most key
253
254
255
256
257
258
259
      auto node = root;
      while (d-- > 0) {
        auto n = node.branch;
        node = n->children.get_ro()[0];
      }
      currentNode = node.leaf;
      currentPosition = 0;
260
      const auto &nodeBits = currentNode->bits.get_ro();
261
      /// Can not overflow as there are at least M/2 entries
262
      while (!nodeBits.test(currentPosition)) ++currentPosition;
263
264
    }

265
266
    iterator &operator++() {
      if (currentPosition >= M - 1) {
267
268
269
        currentNode = currentNode->nextLeaf;
        currentPosition = 0;
        if (currentNode == nullptr) return *this;
270
        const auto &nodeBits = currentNode->bits.get_ro();
271
272
273
        while (!nodeBits.test(currentPosition)) ++currentPosition;
      } else if (!currentNode->bits.get_ro().test(++currentPosition))
        ++(*this);
274
275
      return *this;
    }
276
277
278
279
280
    iterator operator++(int) {
      iterator retval = *this;
      ++(*this);
      return retval;
    }
281

282
283
284
    bool operator==(iterator other) const {
      return (currentNode == other.currentNode && currentPosition == other.currentPosition);
    }
285

286
    bool operator!=(iterator other) const { return !(*this == other); }
287

288
    std::pair<KeyType, ValueType> operator*() {
289
290
291
      const auto &nodeRef = *currentNode;
      return std::make_pair(nodeRef.keys.get_ro()[currentPosition],
                            nodeRef.values.get_ro()[currentPosition]);
292
293
    }

294
    /// iterator traits
295
296
    using difference_type = long;
    using value_type = std::pair<KeyType, ValueType>;
297
298
    using pointer = const std::pair<KeyType, ValueType> *;
    using reference = const std::pair<KeyType, ValueType> &;
299
300
301
302
    using iterator_category = std::forward_iterator_tag;
  };
  iterator begin() { return iterator(rootNode, depth); }
  iterator end() { return iterator(); }
303

304
305
306
  /**
   * Constructor for creating a new  tree.
   */
307
  explicit wBPTree(struct pobj_alloc_class_desc _alloc) : depth(0), alloc_class(_alloc) {
308
    // wBPTree() : depth(0) {
309
    rootNode = newLeafNode();
310
311
    LOG("created new wBPTree with sizeof(BranchNode) = "
        << sizeof(BranchNode) << ", sizeof(LeafNode) = " << sizeof(LeafNode));
312
313
314
315
316
  }

  /**
   * Destructor for the tree. Should delete all allocated nodes.
   */
317
  ~wBPTree() {}
318
319

  /**
320
321
   * Insert an element (a key-value pair) into the tree. If the key @c key already exists, the
   * corresponding value is replaced by @c val.
322
323
324
325
326
   *
   * @param key the key of the element to be inserted
   * @param val the value that is associated with the key
   */
  void insert(const KeyType &key, const ValueType &val) {
327
328
329
330
331
    SplitInfo splitInfo;

    bool wasSplit = false;
    if (depth == 0) {
      /// the root node is a leaf node
332
      wasSplit = insertInLeafNode(rootNode.leaf, key, val, &splitInfo);
333
334
    } else {
      /// the root node is a branch node
335
      wasSplit = insertInBranchNode(rootNode.branch, depth, key, val, &splitInfo);
336
337
338
339
340
341
    }
    if (wasSplit) {
      /// we had an overflow in the node and therefore the node is split
      const auto root = newBranchNode();
      auto &rootRef = *root;
      auto &rootChilds = rootRef.children.get_rw();
342
343
      auto &rootSlots = rootRef.slot.get_rw();
      auto &rootBits = rootRef.bits.get_rw();
344
345
346
      rootRef.keys.get_rw()[0] = splitInfo.key;
      rootChilds[0] = splitInfo.leftChild;
      rootChilds[N] = splitInfo.rightChild;
347
348
349
      rootSlots[1] = 0;
      rootBits.set(0);
      rootSlots[0] = 1;
350
351
352
      rootNode.branch = root;
      ++depth.get_rw();
    }
353
354
355
  }

  /**
356
   * Find the given @c key in the  tree and if found return the corresponding value.
357
358
   *
   * @param key the key we are looking for
359
   * @param[out] val a pointer to memory where the value is stored if the key was found
360
361
   * @return true if the key was found, false otherwise
   */
362
  bool lookup(const KeyType &key, ValueType *val) {
363
    assert(val != nullptr);
364
365
366
    const auto leaf = findLeafNode(key);
    const auto pos = lookupPositionInLeafNode(leaf, key);
    const auto &leafRef = *leaf;
367
368
    const auto &leafSlots = leafRef.slot.get_ro();
    if (pos <= leafSlots[0] && leafRef.keys.get_ro()[leafSlots[pos]] == key) {
369
      /// we found it!
370
      *val = leafRef.values.get_ro()[leafSlots[pos]];
371
372
373
374
375
376
377
378
379
380
381
382
383
      return true;
    }
    return false;
  }

  /**
   * Delete the entry with the given key @c key from the tree.
   *
   * @param key the key of the entry to be deleted
   * @return true if the key was found and deleted
   */
  bool erase(const KeyType &key) {
    bool deleted = false;
384
385
386
387
388
389
390
391
392
393
    if (depth.get_ro() == 0) {
      /// special case: the root node is a leaf node and there is no need to handle underflow
      auto node = rootNode.leaf;
      assert(node != nullptr);
      deleted = eraseFromLeafNode(node, key);
    } else {
      auto node = rootNode.branch;
      assert(node != nullptr);
      deleted = eraseFromBranchNode(node, depth, key);
    }
394
395
396
397
398
399
400
    return deleted;
  }

  /**
   * Print the structure and content of the tree to stdout.
   */
  void print() const {
401
402
403
404
    if (depth == 0)
      printLeafNode(0, rootNode.leaf);
    else
      printBranchNode(0u, rootNode.branch);
405
406
407
408
409
410
411
412
413
  }

  /**
   * Perform a scan over all key-value pairs stored in the tree.
   * For each entry the given function @func is called.
   *
   * @param func the function called for each entry
   */
  void scan(ScanFunc func) const {
414
    /// we traverse to the leftmost leaf node
415
416
    auto node = rootNode;
    auto d = depth.get_ro();
417
    while (d-- > 0) node = node.branch->children.get_ro()[0];
418
    auto leaf = node.leaf;
419
    while (leaf != nullptr) {
420
      auto &leafRef = *leaf;
421
      /// for each key-value pair call func
422
423
      const auto &keys = leafRef.keys.get_ro();
      const auto &vals = leafRef.values.get_ro();
424
425
426
      /*
      /// via bitmap
      const auto &bits = leafRef.bits.get_ro();
427
428
429
430
      for (auto i = 0u; i < M; ++i) {
        if (!bits.test(i)) continue;
        const auto &key = keys[i];
        const auto &val = vals[i];
431
        func(key, val);
432
433
434
      }*/
      /// via slot array
      const auto &slots = leafRef.slot.get_ro();
435
      for (auto i = 1; i < slots[0] + 1; ++i) {
436
437
438
        const auto &key = keys[slots[i]];
        const auto &val = vals[slots[i]];
        func(key, val);
439
      }
440
441
      /// move to the next leaf node
      leaf = leafRef.nextLeaf;
442
443
444
445
    }
  }

  /**
446
447
   * Perform a range scan over all elements within the range [minKey, maxKey] and for each element
   * call the given function @c func.
448
449
450
451
452
453
   *
   * @param minKey the lower boundary of the range
   * @param maxKey the upper boundary of the range
   * @param func the function called for each entry
   */
  void scan(const KeyType &minKey, const KeyType &maxKey, ScanFunc func) const {
454
    auto leaf = findLeafNode(minKey);
455
456
457

    bool higherThanMax = false;
    while (!higherThanMax && leaf != nullptr) {
458
      const auto &leafRef = *leaf;
459
      /// for each key-value pair within the range call func
460
461
462
463
464
465
      const auto &bits = leafRef.search.get_ro().b;
      const auto &keys = leafRef.keys.get_ro();
      const auto &vals = leafRef.values.get_ro();
      for (auto i = 0u; i < M; ++i) {
        if (!bits.test(i)) continue;
        const auto &key = keys[i];
466
        if (key < minKey) continue;
467
468
469
470
        if (key > maxKey) {
          higherThanMax = true;
          continue;
        }
471
        const auto &val = vals[i];
472
473
        func(key, val);
      }
474
475
      /// move to the next leaf node
      leaf = leafRef.nextLeaf;
476
477
478
479
    }
  }

#ifndef UNIT_TESTS
480
 private:
481
482
483
#endif

  /**
484
485
486
   * Insert a (key, value) pair into the corresponding leaf node. It is the responsibility of the
   * caller to make sure that the node @c node is the correct node. The key is inserted at the
   * correct position.
487
488
489
490
491
492
   *
   * @param node the node where the key-value pair is inserted.
   * @param key the key to be inserted
   * @param val the value associated with the key
   * @param splitInfo information about a possible split of the node
   */
493
494
495
  bool insertInLeafNode(const pptr<LeafNode> &node, const KeyType &key, const ValueType &val,
                        SplitInfo *splitInfo) {
    auto &nodeRef = *node;
496
497
    bool split = false;
    const auto slotPos = lookupPositionInLeafNode(node, key);
498
    const auto &slotArray = nodeRef.slot.get_ro();
499
    if (slotPos <= slotArray[0] && nodeRef.keys.get_ro()[slotArray[slotPos]] == key) {
500
      /// handle insert of duplicates
501
      nodeRef.values.get_rw()[slotArray[slotPos]] = val;
502
503
504
505
      return false;
    }

    if (slotArray[0] == M) {
506
      /// split the node
507
508
      splitLeafNode(node, splitInfo);
      auto &splitRef = *splitInfo;
509

510
      /// insert the new entry
511
512
      if (slotPos < (M + 1) / 2)
        insertInLeafNodeAtPosition(splitRef.leftChild.leaf, slotPos, key, val);
513
      else
514
        insertInLeafNodeAtPosition(splitRef.rightChild.leaf,
515
516
                                   lookupPositionInLeafNode(splitRef.rightChild.leaf, key), key,
                                   val);
517

518
      /// inform the caller about the split
519
      splitRef.key =
520
          splitRef.rightChild.leaf->keys.get_ro()[splitRef.rightChild.leaf->slot.get_ro()[1]];
521
522
      split = true;
    } else {
523
      /// otherwise, we can simply insert the new entry at the given position
524
525
526
527
528
      insertInLeafNodeAtPosition(node, slotPos, key, val);
    }
    return split;
  }

529
530
531
532
533
534
535
  /**
   * Split the given leaf node @c node in the middle and move half of the keys/children to the new
   * sibling node.
   *
   * @param node the leaf node to be split
   * @param splitInfo[out] information about the split
   */
536
  void splitLeafNode(const pptr<LeafNode> &node, SplitInfo *splitInfo) {
537
538
539
    auto &nodeRef = *node;
    /// determine the split position
    constexpr auto middle = (M + 1) / 2;
540

541
    /// move all entries behind this position to a new sibling node
542
    // /*
543
544
    const auto sibling = newLeafNode();
    auto &sibRef = *sibling;
545
546
    auto &sibSlots = sibRef.slot.get_rw();
    auto &sibBits = sibRef.bits.get_rw();
547
548
    auto &sibKeys = sibRef.keys.get_rw();
    auto &sibVals = sibRef.values.get_rw();
549
550
    auto &nodeSlots = nodeRef.slot.get_rw();
    auto &nodeBits = nodeRef.bits.get_rw();
551
552
    const auto &nodeKeys = nodeRef.keys.get_ro();
    const auto &nodeVals = nodeRef.values.get_ro();
553
554
555
556
557
558
559
    sibSlots[0] = nodeSlots[0] - middle;
    nodeSlots[0] = middle;
    for (auto i = 1u; i < sibSlots[0] + 1; ++i) {
      sibSlots[i] = i - 1;
      sibBits.set(sibSlots[i]);
      sibKeys[sibSlots[i]] = nodeKeys[nodeSlots[i + middle]];
      sibVals[sibSlots[i]] = nodeVals[nodeSlots[i + middle]];
560
    }
561
562
563
564
565
    for (auto i = middle; i < M; ++i) nodeBits.reset(nodeSlots[i + 1]);
    PersistEmulation::writeBytes(((M - middle + sibSlots[0] + 7) >> 3) +
                                 (sizeof(KeyType) + sizeof(ValueType) + 1) * sibSlots[0] +
                                 2);  ///< bits ceiled + entries/slots + numKeys
    // */
566
567
568

    /// Alternative: copy node, inverse bitmap and shift slots
    /*
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
    const auto sibling = newLeafNode(node);
    auto &sibRef = *sibling;
    auto &sibSlots = sibRef.slot.get_rw();
    auto &sibBits = sibRef.bits.get_rw();
    const auto &sibKeys = sibRef.keys.get_ro();
    auto &nodeSlots = nodeRef.slot.get_rw();
    auto &nodeBits = nodeRef.bits.get_rw();
    sibSlots[0] = sibSlots[0] - middle;
    nodeSlots[0] = middle;
    for (auto i = middle; i < M; ++i)
      nodeBits.reset(sibSlots[i+1]);
    sibBits = nodeBits;
    sibBits.flip();
    for (auto i = 1u; i < sibSlots[0] + 1; ++i) sibSlots[i] = sibSlots[middle + i];
    PersistEmulation::writeBytes(sizeof(LeafNode) + ((M - middle + M + 7) >> 3) + sibSlots[0] +
                                 2);  // copy leaf + bits ceiled + slots + numKeys
585
586
587
588
589
590
    */

    /// setup the list of leaf nodes
    if (nodeRef.nextLeaf != nullptr) {
      sibRef.nextLeaf = nodeRef.nextLeaf;
      nodeRef.nextLeaf->prevLeaf = sibling;
591
      PersistEmulation::writeBytes<16 * 2>();
592
593
594
    }
    nodeRef.nextLeaf = sibling;
    sibRef.prevLeaf = node;
595
    PersistEmulation::writeBytes<16 * 2>();
Philipp Götze's avatar
Philipp Götze committed
596

597
598
599
    auto &splitRef = *splitInfo;
    splitRef.leftChild = node;
    splitRef.rightChild = sibling;
600
    splitRef.key = sibKeys[sibSlots[1]];
601
602
  }

603
  /**
604
605
   * Insert a (key, value) pair at the given position @c pos into the leaf node @c node. The caller
   * has to ensure that
606
   * - there is enough space to insert the element
607
   * - the key is inserted at the correct position according to the order of keys
608
   *
609
   * @param node the leaf node where the element is to be inserted
610
611
612
613
   * @param pos the position in the leaf node (0 <= pos <= numKeys < M)
   * @param key the key of the element
   * @param val the actual value corresponding to the key
   */
614
615
  void insertInLeafNodeAtPosition(const pptr<LeafNode> &node, const unsigned int pos,
                                  const KeyType &key, const ValueType &val) {
616
    assert(pos <= M);
617
    auto &nodeRef = *node;
618
    // auto &search = nodeRef.search.get_rw();
619
620
    auto &slots = nodeRef.slot.get_rw();
    auto &bits = nodeRef.bits.get_rw();
621
    const auto u = bits.getFreeZero();  ///< unused Entry
622

623
    /// insert the new entry at unused position
624
625
    nodeRef.keys.get_rw()[u] = key;
    nodeRef.values.get_rw()[u] = val;
626
627
628
    PersistEmulation::writeBytes<sizeof(KeyType) + sizeof(ValueType)>();

    /// adapt slot array
629
630
    for (auto j = slots[0]; j >= pos; --j) slots[j + 1] = slots[j];
    PersistEmulation::writeBytes(slots[0] - pos + 1);
631
632
633
    slots[pos] = u;
    bits.set(u);
    ++slots[0];
634
    PersistEmulation::writeBytes<3>();
635
636
637
  }

  /**
638
639
   * Insert a (key, value) pair into the tree recursively by following the path down to the leaf
   * level starting at node @c node at depth @c depth.
640
641
642
643
644
645
646
647
   *
   * @param node the starting node for the insert
   * @param depth the current depth of the tree (0 == leaf level)
   * @param key the key of the element
   * @param val the actual value corresponding to the key
   * @param splitInfo information about the split
   * @return true if a split was performed
   */
648
649
  bool insertInBranchNode(const pptr<BranchNode> &node, const unsigned int depth,
                          const KeyType &key, const ValueType &val, SplitInfo *splitInfo) {
650
651
    SplitInfo childSplitInfo;
    bool split = false, hasSplit = false;
652
    auto &nodeRef = *node;
653
654
    const auto &nodeSlots = nodeRef.slot.get_ro();
    const auto &nodeBits = nodeRef.bits.get_ro();
655
    const auto &nodeChilds = nodeRef.children.get_ro();
656
657

    auto pos = lookupPositionInBranchNode(node, key);
658
659
    if (depth == 1) {
      /// case #1: our children are leaf node
660
      auto child = (pos == nodeSlots[0] + 1) ? nodeChilds[N].leaf : nodeChilds[nodeSlots[pos]].leaf;
661
662
      hasSplit = insertInLeafNode(child, key, val, &childSplitInfo);
    } else {
663
      /// case #2: our children are branch nodes
664
665
      auto child =
          (pos == nodeSlots[0] + 1) ? nodeChilds[N].branch : nodeChilds[nodeSlots[pos]].branch;
666
667
668
669
670
      hasSplit = insertInBranchNode(child, depth - 1, key, val, &childSplitInfo);
    }

    if (hasSplit) {
      auto host = node;
671
      /// the child node was split, thus we have to add a new entry to our branch node
672
      if (nodeSlots[0] == N) {
673
        /// this node is also full and needs to be split
674
        splitBranchNode(node, childSplitInfo.key, splitInfo);
675
676
        const auto &splitRef = *splitInfo;
        host = (key < splitRef.key ? splitRef.leftChild : splitRef.rightChild).branch;
677
678
679
        split = true;
        pos = lookupPositionInBranchNode(host, key);
      }
680
      auto &hostRef = *host;
681
682
      auto &hostKeys = hostRef.keys.get_rw();
      auto &hostChilds = hostRef.children.get_rw();
683
684
      auto &hostSlots = hostRef.slot.get_rw();
      auto &hostBits = hostRef.bits.get_rw();
685
      /// Insert new key and children
686
      const auto u = hostBits.getFreeZero();
687
688
689
690
      hostKeys[u] = childSplitInfo.key;
      hostChilds[u] = childSplitInfo.leftChild;

      /// adapt slot array
691
      if (pos <= hostSlots[0]) {
692
        /// if the child isn't inserted at the rightmost position then we have to make space for it
693
694
        for (auto j = hostSlots[0]; j >= pos; --j) hostSlots[j + 1] = hostSlots[j];
        hostChilds[hostSlots[pos + 1]] = childSplitInfo.rightChild;
695
      } else {
696
        hostChilds[N] = childSplitInfo.rightChild;
697
      }
698
699
700
      hostSlots[pos] = u;
      hostBits.set(u);
      ++hostSlots[0];
701
702
703
704
705
    }
    return split;
  }

  /**
706
707
   * Split the given branch node @c node in the middle and move half of the keys/children to the new
   * sibling node.
708
709
710
711
712
   *
   * @param node the branch node to be split
   * @param splitKey the key on which the split of the child occured
   * @param splitInfo information about the split
   */
713
  void splitBranchNode(const pptr<BranchNode> &node, const KeyType &splitKey,
714
                       SplitInfo *splitInfo) {
715
    auto &nodeRef = *node;
716
    const auto &nodeKeys = nodeRef.keys.get_ro();
717
718
    auto &nodeSlots = nodeRef.slot.get_rw();
    auto &nodeBits = nodeRef.bits.get_rw();
719
720
721
    auto &nodeChilds = nodeRef.children.get_rw();

    /// determine the split position
722
    auto middle = (N + 1) / 2;
723
    if (splitKey > nodeKeys[nodeSlots[middle]]) ++middle;
724

725
    /// move all entries behind this position to a new sibling node
726
    auto sibling = newBranchNode();
727
    auto &sibRef = *sibling;
728
729
    auto &sibKeys = sibRef.keys.get_rw();
    auto &sibChilds = sibRef.children.get_rw();
730
731
732
733
    auto &sibSlots = sibRef.slot.get_rw();
    auto &sibBits = sibRef.bits.get_rw();
    sibSlots[0] = nodeSlots[0] - middle;
    for (auto i = 0u; i < sibSlots[0]; ++i) {
734
      sibSlots[i + 1] = i;  ///< set slot
735
      sibBits.set(i);       ///< set bit
736
      /// set key and children
737
738
      sibKeys[i] = nodeKeys[nodeSlots[middle + i + 1]];
      sibChilds[i] = nodeChilds[nodeSlots[middle + i + 1]];
739
    }
740
    for (auto i = middle; i <= N; ++i) nodeBits.reset(nodeSlots[i]);
741
    nodeSlots[0] = middle - 1;
742

743
744
    /// set new most right children
    sibChilds[N] = nodeChilds[N];
745
    nodeChilds[N] = nodeChilds[nodeSlots[middle]];
746
747
748

    /// set split information
    auto &splitRef = *splitInfo;
749
    splitRef.key = nodeKeys[nodeSlots[middle]];
750
751
    splitRef.leftChild = node;
    splitRef.rightChild = sibling;
752
753
754
  }

  /**
755
756
757
   * Traverse the tree starting at the root until the leaf node is found that could contain the
   * given @key. Note, that always a leaf node is returned even if the key doesn't exist on this
   * node.
758
759
760
761
   *
   * @param key the key we are looking for
   * @return the leaf node that would store the key
   */
762
  pptr<LeafNode> findLeafNode(const KeyType &key) const {
763
764
765
    auto node = rootNode;
    auto d = depth.get_ro();
    while (d-- > 0) {
766
      auto pos = lookupPositionInBranchNode(node.branch, key);
767
      auto &nodeRef = *node.branch;
768
      node = nodeRef.children.get_ro()[nodeRef.slot.get_ro()[pos]];
769
770
771
772
    }
    return node.leaf;
  }
  /**
773
   * Lookup the search key @c key in the given leaf node and return the position.
774
775
776
777
778
   *
   * @param node the leaf node where we search
   * @param key the search key
   * @return the position of the key  (or @c M if not found)
   */
779
  auto lookupPositionInLeafNode(const pptr<LeafNode> &node, const KeyType &key) const {
780
    const auto &nodeRef = *node;
781
    const auto &keys = nodeRef.keys.get_ro();
782
    const auto &slots = nodeRef.slot.get_ro();
783
    return binarySearch<false>(keys, slots, 1, slots[0], key);
784
785
786
  }

  /**
787
788
789
   * Lookup the search key @c key in the given branch node and return the position which is the
   * position in the list of keys + 1. in this way, the position corresponds to the position of the
   * child pointer in the array @children.
790
791
792
793
794
795
796
   * If the search key is less than the smallest key, then @c 0 is returned.
   * If the key is greater than the largest key, then @c numKeys is returned.
   *
   * @param node the branch node where we search
   * @param key the search key
   * @return the position of the key + 1 (or 0 or @c numKey)
   */
797
  auto lookupPositionInBranchNode(const pptr<BranchNode> &node, const KeyType &key) const {
798
    const auto &nodeRef = *node;
799
    const auto &keys = nodeRef.keys.get_ro();
800
    const auto &slots = nodeRef.slot.get_ro();
801
    return binarySearch<true>(keys, slots, 1, slots[0], key);
802
803
804
805
806
807
808
809
810
  }

  /**
   * Delete the element with the given key from the given leaf node.
   *
   * @param node the leaf node from which the element is deleted
   * @param key the key of the element to be deleted
   * @return true of the element was deleted
   */
811
  bool eraseFromLeafNode(const pptr<LeafNode> &node, const KeyType &key) {
812
    auto pos = lookupPositionInLeafNode(node, key);
813
814
815
816
817
818
819
820
    const auto &nodeRef = *node;
    const auto &keys = nodeRef.keys.get_ro();
    const auto &slots = nodeRef.slot.get_ro();
    if (keys[slots[pos]] == key) {
      return eraseFromLeafNodeAtPosition(node, pos, key);
    } else {
      return false;
    }
821
822
823
824
825
826
827
828
829
830
831
832
  }

  /**
   * Delete the element with the given position and key from the given leaf node.
   *
   * @param node the leaf node from which the element is deleted
   * @param pos the position of the key in the node
   * @param key the key of the element to be deleted
   * @return true of the element was deleted
   */
  bool eraseFromLeafNodeAtPosition(const pptr<LeafNode> &node, const unsigned int pos,
                                   const KeyType &key) {
833
    auto &nodeRef = *node;
834
835
    auto &nodeSlots = nodeRef.slot.get_rw();
    auto &nodeBits = nodeRef.bits.get_rw();
836
    // if (nodeRef.keys.get_ro()[nodeSlots[pos]] == key) {
837
838
839
840
841
842
843
    nodeBits.reset(nodeSlots[pos]);
    for (auto i = pos; i < nodeSlots[0] + 1; ++i) {
      nodeSlots[i] = nodeSlots[i + 1];
    }
    --nodeSlots[0];
    PersistEmulation::writeBytes(1 + (nodeSlots[0] + 2 - pos));
    return true;
844
845
    // }
    // return false;
846
847
848
  }

  /**
849
850
   * Delete an entry from the tree by recursively going down to the leaf level and handling the
   * underflows.
851
852
853
854
855
856
   *
   * @param node the current branch node
   * @param d the current depth of the traversal
   * @param key the key to be deleted
   * @return true if the entry was deleted
   */
857
  bool eraseFromBranchNode(const pptr<BranchNode> &node, const unsigned int d, const KeyType &key) {
858
859
    assert(d >= 1);
    bool deleted = false;
860
    const auto &nodeRef = *node;
861
    const auto &nodeChilds = nodeRef.children.get_ro();
862
    const auto &nodeSlots = nodeRef.slot.get_ro();
863
    /// try to find the branch
864
865
    auto pos = lookupPositionInBranchNode(node, key);
    if (d == 1) {
866
      /// the next level is the leaf level
867
      auto leaf = (pos == nodeSlots[0] + 1) ? nodeChilds[N].leaf : nodeChilds[nodeSlots[pos]].leaf;
868
869
      assert(leaf != nullptr);
      deleted = eraseFromLeafNode(leaf, key);
870
      constexpr auto middle = (M + 1) / 2;
871
      /// handle possible underflow
872
      if (leaf->slot.get_ro()[0] < middle) underflowAtLeafLevel(node, pos, leaf);
873
    } else {
874
875
      auto child =
          (pos == nodeSlots[0] + 1) ? nodeChilds[N].branch : nodeChilds[nodeSlots[pos]].branch;
876
877
      deleted = eraseFromBranchNode(child, d - 1, key);
      pos = lookupPositionInBranchNode(node, key);
878
879
      constexpr auto middle = (N + 1) / 2;
      /// handle possible underflow
880
      if (child->slot.get_ro()[0] < middle) {
881
        child = underflowAtBranchLevel(node, pos, child);
882
        if (d == depth.get_ro() && nodeSlots[0] == 0) {
883
          /// special case: the root node is empty now
884
          rootNode = child;
885
          --depth.get_rw();
886
887
888
889
890
891
892
        }
      }
    }
    return deleted;
  }

  /**
893
894
   * Handle the case that during a delete operation a underflow at node @c leaf occured. If possible
   * this is handled
895
896
897
898
   * (1) by rebalancing the elements among the leaf node and one of its siblings
   * (2) if not possible by merging with one of its siblings.
   *
   * @param node the parent node of the node where the underflow occured
899
900
   * @param pos the position in the slot array containing the position in turn of the key from the
   *            left child node @leaf in the @c children array of the branch node
901
902
   * @param leaf the node at which the underflow occured
   */
903
  void underflowAtLeafLevel(const pptr<BranchNode> &node, unsigned int pos, pptr<LeafNode> &leaf) {
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
    auto &nodeRef = *node;
    auto &leafRef = *leaf;
    const auto &nodeSlots = nodeRef.slot.get_ro();
    auto &nodeKeys = nodeRef.keys.get_rw();
    auto prevNumKeys = 0u, nextNumKeys = 0u;
    assert(pos <= nodeSlots[0] + 1);
    constexpr auto middle = (M + 1) / 2;
    /// 1. we check whether we can rebalance with one of the siblings but only if both nodes have
    ///    the same direct parent
    if (pos > 1 && (prevNumKeys = leafRef.prevLeaf->slot.get_ro()[0]) > middle) {
      /// we have a sibling at the left for rebalancing the keys
      balanceLeafNodes(leafRef.prevLeaf, leaf);
      const auto newKey = leafRef.keys.get_ro()[leafRef.slot.get_ro()[1]];
      const auto slotPos = (pos == nodeSlots[0] + 1) ? nodeSlots[0] : pos;
      nodeKeys[nodeSlots[slotPos]] = newKey;
    } else if (pos <= nodeSlots[0] && (nextNumKeys = leafRef.nextLeaf->slot.get_ro()[0]) > middle) {
      /// we have a sibling at the right for rebalancing the keys
      balanceLeafNodes(leafRef.nextLeaf, leaf);
      const auto &nextLeaf = *leafRef.nextLeaf;
      nodeKeys[nodeSlots[pos + 1]] = nextLeaf.keys.get_ro()[nextLeaf.slot.get_ro()[1]];
    }
    /// 2. if this fails we have to merge two leaf nodes but only if both nodes have the same
    ///    direct parent
    else {
      pptr<LeafNode> survivor = nullptr;
      if (pos > 1 && prevNumKeys <= middle) {
        survivor = mergeLeafNodes(leafRef.prevLeaf, leaf);
        deleteLeafNode(leaf);
        --pos;
      } else if (pos <= nodeSlots[0] && nextNumKeys <= middle) {
        /// because we update the pointers in mergeLeafNodes we keep it here
        auto l = leafRef.nextLeaf;
        survivor = mergeLeafNodes(leaf, l);
        deleteLeafNode(l);
      } else
        assert(false);  ///< this shouldn't happen?!

      if (nodeSlots[0] > 1) {
        /// just remove the child node from the current branch node
        auto &nodeSlotsW = nodeRef.slot.get_rw();
        auto &nodeBitsW = nodeRef.bits.get_rw();
        nodeBitsW.reset(nodeSlots[pos]);
        for (auto i = pos; i < nodeSlots[0]; ++i) {
          nodeSlotsW[i] = nodeSlots[i + 1];
948
        }
949
950
951
952
953
954
955
956
        const auto surPos = (pos == nodeSlots[0]) ? N : nodeSlots[pos];
        nodeRef.children.get_rw()[surPos] = survivor;
        --nodeSlotsW[0];
      } else {
        /// This is a special case that happens only if the current node is the root node. Now, we
        /// have to replace the branch root node by a leaf node.
        rootNode = survivor;
        --depth.get_rw();
957
958
      }
    }
959
  }
960
961

  /**
962
963
964
   * Handle the case that during a delete operation a underflow at node @c child occured where @c
   * node is the parent node. If possible this is handled
   * (1) by rebalancing the elements among the node @c child and one of its siblings
965
966
967
   * (2) if not possible by merging with one of its siblings.
   *
   * @param node the parent node of the node where the underflow occured
968
   * @param pos the position of the child node @child in the @c children array of the branch node
969
970
971
   * @param child the node at which the underflow occured
   * @return the (possibly new) child node (in case of a merge)
   */
972
  pptr<BranchNode> underflowAtBranchLevel(const pptr<BranchNode> &node, const unsigned int pos,
973
                                          pptr<BranchNode> &child) {
974
975
    assert(node != nullptr);
    assert(child != nullptr);
976
    auto &nodeRef = *node;
977
978
    const auto &nodeKeys = nodeRef.keys.get_ro();
    const auto &nodeChilds = nodeRef.children.get_ro();
979
    const auto &nodeSlots = nodeRef.slot.get_ro();
980
    auto prevNumKeys = 0u, nextNumKeys = 0u;
981
    constexpr auto middle = (N + 1) / 2;
982