FPTree.hpp 46.2 KB
Newer Older
1
/*
2
 * Copyright (C) 2017-2020 DBIS Group - TU Ilmenau, All Rights Reserved.
3
 *
4
 * This file is part of our NVM-based Data Structures repository.
5
 *
6
7
8
 * This program is free software: you can redistribute it and/or modify it under the terms of the
 * GNU General Public License as published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
9
 *
10
11
12
 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 * See the GNU General Public License for more details.
13
 *
14
15
 * You should have received a copy of the GNU General Public License along with this program.
 * If not, see <http://www.gnu.org/licenses/>.
16
17
18
19
20
21
 */

#ifndef DBIS_FPTree_hpp_
#define DBIS_FPTree_hpp_

#include <array>
22
#include <cmath>
23
24
#include <iostream>

25
#include <libpmemobj/ctl.h>
26
27
28
29
30
31
#include <libpmemobj++/make_persistent.hpp>
#include <libpmemobj++/p.hpp>
#include <libpmemobj++/persistent_ptr.hpp>
#include <libpmemobj++/transaction.hpp>
#include <libpmemobj++/utils.hpp>

32
#include "config.h"
33
#include "utils/Bitmap.hpp"
34
#include "utils/PersistEmulation.hpp"
35
#include "utils/SearchFunctions.hpp"
36

37
namespace dbis::pbptrees {
38

39
using pmem::obj::allocation_flag;
40
41
42
43
44
using pmem::obj::delete_persistent;
using pmem::obj::make_persistent;
using pmem::obj::p;
using pmem::obj::persistent_ptr;
using pmem::obj::transaction;
45
46
template <typename Object>
using pptr = persistent_ptr<Object>;
47
48
49
50
51
52
53
54
55
56
57

/**
 * A persistent memory implementation of a FPTree.
 *
 * @tparam KeyType the data type of the key
 * @tparam ValueType the data type of the values associated with the key
 * @tparam N the maximum number of keys on a branch node
 * @tparam M the maximum number of keys on a leaf node
 */
template<typename KeyType, typename ValueType, int N, int M>
class FPTree {
58
  /// we need at least three keys on a branch node to be able to split
59
  static_assert(N > 2, "number of branch keys has to be >2.");
60
61
  /// we need an even order for branch nodes to be able to merge
  static_assert(N % 2 == 0, "order of branch nodes must be even.");
62
  /// we need at least one key on a leaf node
63
64
65
66
67
68
69
70
  static_assert(M > 0, "number of leaf keys should be >0.");

#ifndef UNIT_TESTS
  private:
#else
  public:
#endif

71
  /// Forward declarations
72
73
74
  struct LeafNode;
  struct BranchNode;

75
76
  struct Node {
    Node() : tag(BLANK) {};
77

78
    Node(const pptr<LeafNode> &leaf_) : tag(LEAF), leaf(leaf_) {};
79

80
    Node(BranchNode *branch_) : tag(BRANCH), branch(branch_) {};
81

82
    Node(const Node &other) { copy(other); };
83

84
    void copy(const Node &other) throw() {
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
      tag = other.tag;

      switch (tag) {
        case LEAF: {
          leaf = other.leaf;
          break;
        }
        case BRANCH: {
          branch = other.branch;
          break;
        }
        default: break;
      }
    }

100
    Node &operator=(Node other) {
101
102
103
104
105
      copy(other);
      return *this;
    }

    enum NodeType {
106
      BLANK, LEAF, BRANCH
107
108
    } tag;
    union {
109
      pptr<LeafNode> leaf;
110
111
112
113
114
115
116
      BranchNode *branch;
    };
  };

  /**
   * A structure for representing a leaf node of a B+ tree.
   */
117
118
119
120
121
  struct alignas(64) LeafNode {

    static constexpr auto NUM_KEYS = M;
    using KEY_TYPE = KeyType;

122
123
124
    /**
     * Constructor for creating a new empty leaf node.
     */
125
126
    LeafNode() : nextLeaf(nullptr), prevLeaf(nullptr) {}

127
128
129
130
131
132
    static constexpr auto BitsetSize = ((M + 63) / 64) * 8;  ///< number * size of words
    static constexpr auto HashSize = ((M + 7) / 8) * 8;  ///< round to 8 Byte
    static constexpr auto SearchSize = BitsetSize + HashSize + 32;
    static constexpr auto PaddingSize = (64 - SearchSize % 64) % 64;

    // p<LeafSearch> search;               ///< helper structure for faster searches
133
    p<dbis::Bitmap<M>> bits;             ///< bitmap for valid entries
134
135
136
137
138
139
    p<std::array<uint8_t, M>> fp;        ///< fingerprint array (n & 0xFF)
    pptr<LeafNode> nextLeaf;             ///< pointer to the subsequent sibling
    pptr<LeafNode> prevLeaf;             ///< pointer to the preceeding sibling
    char padding[PaddingSize];           ///< padding to align keys to 64 bytes
    p<std::array<KeyType, M>> keys;      ///< the actual keys
    p<std::array<ValueType, M>> values;  ///< the actual values
140
141
142
143
144
  };

  /**
   * A structure for representing an branch node (branch node) of a B+ tree.
   */
145
146
147
148
  struct alignas(64) BranchNode {

    static constexpr auto NUM_KEYS = N;

149
150
151
152
153
    /**
     * Constructor for creating a new empty branch node.
     */
    BranchNode() : numKeys(0) {}

154
155
156
    unsigned int numKeys;             ///< the number of currently stored keys
    std::array<KeyType, N> keys;      ///< the actual keys
    std::array<Node, N + 1> children; ///< pointers to child nodes (BranchNode or LeafNode)
157
158
159
160
161
  };

  /**
   * Create a new empty leaf node
   */
162
  pptr<LeafNode> newLeafNode() {
163
    auto pop = pmem::obj::pool_by_vptr(this);
164
    pptr<LeafNode> newNode = nullptr;
165
166
167
    transaction::run(pop, [&] {
      newNode = make_persistent<LeafNode>(allocation_flag::class_id(alloc_class.class_id));
    });
168
169
170
    return newNode;
  }

171
  pptr<LeafNode> newLeafNode(const pptr<LeafNode> &other) {
172
    auto pop = pmem::obj::pool_by_vptr(this);
173
    pptr<LeafNode> newNode = nullptr;
174
175
176
    transaction::run(pop, [&] {
      newNode = make_persistent<LeafNode>(allocation_flag::class_id(alloc_class.class_id), *other);
    });
177
178
179
    return newNode;
  }

180
  void deleteLeafNode(const pptr<LeafNode> &node) {
181
    auto pop = pmem::obj::pool_by_vptr(this);
182
    transaction::run(pop, [&] { delete_persistent<LeafNode>(node); });
183
184
185
186
187
188
  }

  /**
   * Create a new empty branch node
   */
  BranchNode *newBranchNode() {
189
    return new BranchNode();
190
191
  }

192
  void deleteBranchNode(const BranchNode * const node) {
193
    delete node;
194
195
196
  }

  /**
197
   * A structure for passing information about a node split to the caller.
198
199
   */
  struct SplitInfo {
200
201
202
    KeyType key;     ///< the key at which the node was split
    Node leftChild;  ///< the resulting lhs child node
    Node rightChild; ///< the resulting rhs child node
203
204
  };

205
206
207
208
209
210
  static constexpr pobj_alloc_class_desc AllocClass{256, 64, 1, POBJ_HEADER_COMPACT};
  pobj_alloc_class_desc alloc_class;
  unsigned int depth;       /**< the depth of the tree, i.e. the number of levels
                                 (0 => rootNode is LeafNode) */
  Node rootNode;            ///< pointer to the root node
  pptr<LeafNode> leafList;  ///< pointer to the leaf at the most left position (for recovery)
211

212
213
214
  /* -------------------------------------------------------------------------------------------- */

 public:
215
216
217
218
  /**
   * Iterator for iterating over the leaf nodes
   */
  class iterator {
219
220
    pptr<LeafNode> currentNode;
    unsigned int currentPosition;
221
222
223

    public:
    iterator() : currentNode(nullptr), currentPosition(0) {}
224
225
    iterator(const Node &root, unsigned int d) {
      /// traverse to left-most key
226
      auto node = root;
227
      while (d-- > 0) node = node.branch->children[0];
228
229
      currentNode = node.leaf;
      currentPosition = 0;
230
      const auto &nodeBits = currentNode->bits.get_ro();
231
      /// Can not overflow as there are at least M/2 entries
232
      while(!nodeBits.test(currentPosition)) ++currentPosition;
233
234
235
    }

    iterator& operator++() {
236
      if (currentPosition >= M-1) {
237
238
        currentNode = currentNode->nextLeaf;
        currentPosition = 0;
239
        if (currentNode == nullptr) return *this;
240
        const auto &nodeBits = currentNode->bits.get_ro();
241
        while(!nodeBits.test(currentPosition)) ++currentPosition;
242
      } else if (!currentNode->bits.get_ro().test(++currentPosition)) ++(*this);
243
244
245
      return *this;
    }

246
247
248
249
250
251
252
253
254
255
256
    iterator operator++(int) {
      iterator retval = *this;
      ++(*this);
      return retval;
    }

    bool operator==(iterator other) const {
      return (currentNode == other.currentNode && currentPosition == other.currentPosition);
    }

    bool operator!=(iterator other) const { return !(*this == other); }
257
258

    std::pair<KeyType, ValueType> operator*() {
259
260
261
      const auto nodeRef = *currentNode;
      return std::make_pair(nodeRef.keys.get_ro()[currentPosition],
                            nodeRef.values.get_ro()[currentPosition]);
262
263
    }

264
    /// iterator traits
265
266
267
268
269
270
271
272
    using difference_type = long;
    using value_type = std::pair<KeyType, ValueType>;
    using pointer = const std::pair<KeyType, ValueType>*;
    using reference = const std::pair<KeyType, ValueType>&;
    using iterator_category = std::forward_iterator_tag;
  };
  iterator begin() { return iterator(rootNode, depth); }
  iterator end() { return iterator(); }
273
274
275
276
277
278
279
280

  /* -------------------------------------------------------------------------------------------- */

  /**
   * Typedef for a function passed to the scan method.
   */
  using ScanFunc = std::function<void(const KeyType &key, const ValueType &val)>;

281
282
283
  /**
   * Constructor for creating a new  tree.
   */
284
285
  explicit FPTree(struct pobj_alloc_class_desc _alloc) : depth(0), alloc_class(_alloc) {
  // FPTree() {
286
    rootNode = newLeafNode();
287
288
    leafList = rootNode.leaf;
    depth = 0;
289
290
    LOG("created new FPTree with sizeof(BranchNode) = "
        << sizeof(BranchNode) << ", sizeof(LeafNode) = " << sizeof(LeafNode));
291
292
293
294
295
  }

  /**
   * Destructor for the tree. Should delete all allocated nodes.
   */
296
  ~FPTree() {}
297
298

  /**
299
300
   * Insert an element (a key-value pair) into the tree. If the key @c key already exists, the
   * corresponding value is replaced by @c val.
301
302
303
304
305
   *
   * @param key the key of the element to be inserted
   * @param val the value that is associated with the key
   */
  void insert(const KeyType &key, const ValueType &val) {
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
    SplitInfo splitInfo;

    bool wasSplit = false;
    if (depth == 0) {
      /// the root node is a leaf node
      auto n = rootNode.leaf;
      wasSplit = insertInLeafNode(n, key, val, &splitInfo);
    } else {
      /// the root node is a branch node
      auto n = rootNode.branch;
      wasSplit = insertInBranchNode(n, depth, key, val, &splitInfo);
    }
    if (wasSplit) {
      /// we had an overflow in the node and therefore the node is split
      const auto root = newBranchNode();
      auto &rootRef = *root;
      rootRef.keys[0] = splitInfo.key;
      rootRef.children[0] = splitInfo.leftChild;
      rootRef.children[1] = splitInfo.rightChild;
      ++rootRef.numKeys;
      rootNode.branch = root;
      ++depth;
    }
329
330
331
  }

  /**
332
   * Find the given @c key in the  tree and if found return the corresponding value.
333
334
   *
   * @param key the key we are looking for
335
   * @param[out] val a pointer to memory where the value is stored if the key was found
336
337
338
339
   * @return true if the key was found, false otherwise
   */
  bool lookup(const KeyType &key, ValueType *val)  {
    assert(val != nullptr);
340
341
    const auto leaf = findLeafNode(key);
    const auto pos = lookupPositionInLeafNode(leaf, key);
342
    if (pos < M) {
343
344
345
      /// we found it!
      *val = leaf->values.get_ro()[pos];
      return true;
346
    }
347
    return false;
348
349
350
351
352
353
354
355
356
357
  }

  /**
   * Delete the entry with the given key @c key from the tree.
   *
   * @param key the key of the entry to be deleted
   * @return true if the key was found and deleted
   */
  bool erase(const KeyType &key) {
    bool result;
358
359
360
361
362
363
364
365
366
367
    if (depth == 0) {
      /// special case: the root node is a leaf node and there is no need to handle underflow
      auto node = rootNode.leaf;
      assert(node != nullptr);
      result=eraseFromLeafNode(node, key);
    } else {
      auto node = rootNode.branch;
      assert(node != nullptr);
      result=eraseFromBranchNode(node, depth, key);
    }
368
369
    return result;
  }
370

371
372
373
  /**
   * Recover the FPTree by iterating over the LeafList and using the recoveryInsert method.
   */
374
  void recover() {
375
    LOG("Starting RECOVERY of FPTree");
376
    pptr<LeafNode> currentLeaf = leafList;
377
    if (leafList == nullptr) {
378
      LOG("No data to recover FPTree");
379
      return;
380
    }
381
382
383
384
385
386
    /* counting leafs */
    auto leafs = 0u;
    while(currentLeaf != nullptr) {
      ++leafs;
      currentLeaf = currentLeaf->nextLeaf;
    }
387
388
    // float x = std::log(leafs)/std::log(N+1);
    // assert(x == int(x) && "Not supported for this amount of leafs, yet");
389
390
391
392

    /* actual recovery */
    currentLeaf = leafList;
    if (leafList->nextLeaf == nullptr) {
393
      /// The index has only one node, so the leaf node becomes the root node
394
395
      rootNode = leafList;
      depth = 0;
396
    } else {
397
      rootNode = newBranchNode();
398
      depth = 1;
399
400
      rootNode.branch->children[0] = currentLeaf;
      currentLeaf = currentLeaf->nextLeaf;
401
402
403
404
405
      while (currentLeaf != nullptr) {
        recoveryInsert(currentLeaf);
        currentLeaf = currentLeaf->nextLeaf;
      }
    }
406
    LOG("RECOVERY Done")
407
408
409
410
411
412
  }

  /**
   * Print the structure and content of the tree to stdout.
   */
  void print() const {
413
414
    if (depth == 0) printLeafNode(0, rootNode.leaf);
    else printBranchNode(0u, rootNode.branch);
415
416
417
418
419
420
421
422
  }

  /**
   * Perform a scan over all key-value pairs stored in the tree.
   * For each entry the given function @func is called.
   *
   * @param func the function called for each entry
   */
423
  void scan(ScanFunc func) const {
424
    /// we traverse to the leftmost leaf node
425
426
    auto node = rootNode;
    auto d = depth;
427
    while ( d-- > 0) node = node.branch->children[0];
428
429
    auto leaf = node.leaf;
    while (leaf != nullptr) {
430
      auto &leafRef = *leaf;
431
      /// for each key-value pair call func
432
      const auto &leafBits = leafRef.bits.get_ro();
433
434
435
436
437
438
      const auto &leafKeys = leafRef.keys.get_ro();
      const auto &leafValues = leafRef.values.get_ro();
      for (auto i = 0u; i < M; i++) {
        if (!leafBits.test(i)) continue;
        const auto &key = leafKeys[i];
        const auto &val = leafValues[i];
439
440
        func(key, val);
      }
441
442
      /// move to the next leaf node
      leaf = leafRef.nextLeaf;
443
    }
444
  }
445
446

  /**
447
448
   * Perform a range scan over all elements within the range [minKey, maxKey] and for each element
   * call the given function @c func.
449
450
451
452
453
   *
   * @param minKey the lower boundary of the range
   * @param maxKey the upper boundary of the range
   * @param func the function called for each entry
   */
454
  void scan(const KeyType &minKey, const KeyType &maxKey, ScanFunc func) const {
455
456
457
458
    auto leaf = findLeafNode(minKey);

    bool higherThanMax = false;
    while (!higherThanMax && leaf != nullptr) {
459
460
      /// for each key-value pair within the range call func
      const auto leafRef = *leaf;
461
      const auto &leafBits = leafRef.bits.get_ro();
462
463
      const auto &leafKeys = leafRef.keys.get_ro();
      const auto &leafValues = leafRef.values.get_ro();
464
      for (auto i = 0u; i < M; i++) {
465
466
        if (!leafBits.test(i)) continue;
        auto &key = leafKeys[i];
467
468
469
        if (key < minKey) continue;
        if (key > maxKey) { higherThanMax = true; continue; }

470
        auto &val = leafValues[i];
471
472
        func(key, val);
      }
473
474
      /// move to the next leaf node
      leaf = leafRef.nextLeaf;
475
    }
476
  }
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491

#ifndef UNIT_TESTS
  private:
#endif

  /**
   * Insert a (key, value) pair into the corresponding leaf node. It is the
   * responsibility of the caller to make sure that the node @c node is
   * the correct node. The key is inserted at the correct position.
   *
   * @param node the node where the key-value pair is inserted.
   * @param key the key to be inserted
   * @param val the value associated with the key
   * @param splitInfo information about a possible split of the node
   */
492
  bool insertInLeafNode(const pptr<LeafNode> &node, const KeyType &key,
493
      const ValueType &val, SplitInfo *splitInfo) {
494
    auto &nodeRef = *node;
495
496
497
    bool split = false;
    auto pos = lookupPositionInLeafNode(node, key);

498
    if (pos < M) {
499
      /// handle insert of duplicates
500
      nodeRef.values.get_rw()[pos] = val;
501
      return false;
502
    }
503
    pos = nodeRef.bits.get_ro().getFreeZero();
504
    if (pos == M) {
505
506
507
508
509
510
511
      /* split the node */
      splitLeafNode(node, splitInfo);
      auto &splitRef = *splitInfo;
      auto sibling= splitRef.rightChild.leaf;
      auto &sibRef= *sibling;

      /* insert the new entry */
512
      if (key > splitRef.key) {
513
        insertInLeafNodeAtPosition(sibling, sibRef.bits.get_ro().getFreeZero(), key, val);
514
      } else {
515
        if (key > nodeRef.keys.get_ro()[findMaxKeyPos(nodeRef.keys.get_ro(), nodeRef.bits.get_ro())]) {
516
          /// Special case: new key would be the middle, thus must be right
517
          insertInLeafNodeAtPosition(sibling, sibRef.bits.get_ro().getFreeZero(), key, val);
518
519
          splitRef.key = key;
        } else {
520
          insertInLeafNodeAtPosition(node, nodeRef.bits.get_ro().getFreeZero(), key, val);
521
522
        }
      }
523
      /* inform the caller about the split */
524
525
      split = true;
    } else {
526
      /* otherwise, we can simply insert the new entry at the given position */
527
528
529
530
531
      insertInLeafNodeAtPosition(node, pos, key, val);
    }
    return split;
  }

532
533
534
535
536
537
538
  /**
   * Split the given leaf node @c node in the middle and move half of the keys/children to the new
   * sibling node.
   *
   * @param node the leaf node to be split
   * @param splitInfo[out] information about the split
   */
539
  void splitLeafNode(const pptr<LeafNode> &node, SplitInfo *splitInfo) {
540
541
542
    auto &nodeRef = *node;

    /* determine the split position by finding median in unsorted array of keys*/
543
544
    const auto [bitmap, splitPos] = findSplitKey<KeyType, M>(nodeRef.keys.get_ro().data());
    const auto &splitKey = nodeRef.keys.get_ro()[splitPos];
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575

    /// copy leaf
    /*
    const auto sibling = newLeafNode(node);
    auto &sibRef = *sibling;
    nodeRef.bits.get_rw() = bitmap;
    sibRef.bits.get_rw() = bitmap.flip();
    PersistEmulation::writeBytes<sizeof(LeafNode) + ((2*M+7)>>3)>(); /// copy leaf + 2 bitmaps
    */

    /// Alternative: move instead of complete copy
    ///*
    const auto sibling = newLeafNode();
    auto &sibRef = *sibling;
    auto &sibBits = sibRef.bits.get_rw();
    auto &sibHashs = sibRef.fp.get_rw();
    auto &sibKeys = sibRef.keys.get_rw();
    auto &sibValues = sibRef.values.get_rw();
    auto &nodeBits = nodeRef.bits.get_rw();
    const auto &nodeHashs = nodeRef.fp.get_ro();
    const auto &nodeKeys = nodeRef.keys.get_ro();
    const auto &nodeValues = nodeRef.values.get_ro();
    auto j = 0u;
    for (auto i = 0u; i < M; i++) {
      if (nodeKeys[i] >= splitKey) {
        sibKeys[j] = nodeKeys[i];
        sibValues[j] = nodeValues[i];
        sibHashs[j] = nodeHashs[i];
        sibBits.set(j);
        nodeBits.reset(i);
        j++;
576
      }
577
578
579
580
581
582
583
584
585
586
587
588
589
590
    }
    PersistEmulation::writeBytes(j * (sizeof(KeyType) + sizeof(ValueType) + 1) +
                                 ((j * 2 + 7) >> 3));  /// j entries/hashes + j*2 bits
    // */

    /// setup the list of leaf nodes
    if (nodeRef.nextLeaf != nullptr) {
      sibRef.nextLeaf = nodeRef.nextLeaf;
      nodeRef.nextLeaf->prevLeaf = sibling;
      PersistEmulation::writeBytes<16 * 2>();
    }
    nodeRef.nextLeaf = sibling;
    sibRef.prevLeaf = node;
    PersistEmulation::writeBytes<16 * 2>();
591

592
593
594
595
596
    /// set split information
    auto &splitRef = *splitInfo;
    splitRef.leftChild = node;
    splitRef.rightChild = sibling;
    splitRef.key = splitKey;
597
598
  }

599
600
601
602
603
604
605
606
607
608
609
610
  /**
   * Insert a (key, value) pair at the given position @c pos into the leaf node
   * @c node. The caller has to ensure that
   * - there is enough space to insert the element
   * - the key is inserted at the correct position according to the order of
   * keys
   *
   * @oaram node the leaf node where the element is to be inserted
   * @param pos the position in the leaf node (0 <= pos <= numKeys < M)
   * @param key the key of the element
   * @param val the actual value corresponding to the key
   */
611
  void insertInLeafNodeAtPosition(const pptr<LeafNode> &node, const unsigned int pos,
612
                                  const KeyType &key, const ValueType &val) {
613
    assert(pos < M);
614
    auto &nodeRef = *node;
615

616
617
618
    /// insert the new entry at the given position
    nodeRef.keys.get_rw()[pos] = key;
    nodeRef.values.get_rw()[pos] = val;
619

620
    /// set bit and hash
621
622
    nodeRef.bits.get_rw().set(pos);
    nodeRef.fp.get_rw()[pos] = fpHash(key);
623
    PersistEmulation::writeBytes<sizeof(KeyType) + sizeof(ValueType) + 2>();
624
625
626
627
628
629
630
  }

  /**
   * Insert a (key, value) pair into the tree recursively by following the path
   * down to the leaf level starting at node @c node at depth @c depth.
   *
   * @param node the starting node for the insert
631
   * @param d the current depth of the tree (0 == leaf level)
632
633
634
635
636
   * @param key the key of the element
   * @param val the actual value corresponding to the key
   * @param splitInfo information about the split
   * @return true if a split was performed
   */
637
638
  bool insertInBranchNode(BranchNode * const node, unsigned int d, const KeyType &key,
                          const ValueType &val, SplitInfo *splitInfo) {
639
640
    SplitInfo childSplitInfo;
    bool split = false, hasSplit = false;
641
    auto &nodeRef = *node;
642
643

    auto pos = lookupPositionInBranchNode(node, key);
644
645
646
    if (d == 1) {
      /// case #1: our children are leaf nodes
      auto child = nodeRef.children[pos].leaf;
647
      hasSplit = insertInLeafNode(child, key, val, &childSplitInfo);
648
    } else {
649
650
651
      /// case #2: our children are branch nodes
      auto child = nodeRef.children[pos].branch;
      hasSplit = insertInBranchNode(child, d - 1, key, val, &childSplitInfo);
652
653
    }
    if (hasSplit) {
654
655
656
657
      auto host = node;
      /// the child node was split, thus we have to add a new entry
      /// to our branch node
      if (nodeRef.numKeys == N) {
658
        splitBranchNode(node, childSplitInfo.key, splitInfo);
659
660
        const auto &splitRef = *splitInfo;
        host = (key < splitRef.key ? splitRef.leftChild : splitRef.rightChild).branch;
661
662
663
        split = true;
        pos = lookupPositionInBranchNode(host, key);
      }
664
665
666
667
668
669
670
671
      auto &hostRef = *host;
      if (pos < hostRef.numKeys) {
        /// if the child isn't inserted at the rightmost position
        /// then we have to make space for it
        hostRef.children[hostRef.numKeys + 1] = hostRef.children[hostRef.numKeys];
        for (auto i = hostRef.numKeys; i > pos; i--) {
          hostRef.children[i] = hostRef.children[i - 1];
          hostRef.keys[i] = hostRef.keys[i - 1];
672
673
        }
      }
674
675
676
677
678
      /// finally, add the new entry at the given position
      hostRef.keys[pos] = childSplitInfo.key;
      hostRef.children[pos] = childSplitInfo.leftChild;
      hostRef.children[pos + 1] = childSplitInfo.rightChild;
      hostRef.numKeys = hostRef.numKeys + 1;
679
680
681
682
683
    }
    return split;
  }

  /**
684
685
   * Split the given branch node @c node in the middle and move half of the keys/children to the new
   * sibling node.
686
687
688
689
690
   *
   * @param node the branch node to be split
   * @param splitKey the key on which the split of the child occured
   * @param splitInfo information about the split
   */
691
692
693
694
695
696
697
698
699
700
701
702
703
704
  void splitBranchNode(BranchNode * const node, const KeyType &splitKey, SplitInfo *splitInfo) {
    auto &nodeRef = *node;
    /// determine the split position
    auto middle = (N + 1) / 2;
    /// adjust the middle based on the key we have to insert
    if (splitKey > nodeRef.keys[middle]) middle++;

    /// move all entries behind this position to a new sibling node
    const auto sibling = newBranchNode();
    auto &sibRef = *sibling;
    sibRef.numKeys = nodeRef.numKeys - middle;
    for (auto i = 0u; i < sibRef.numKeys; i++) {
      sibRef.keys[i] = nodeRef.keys[middle + i];
      sibRef.children[i] = nodeRef.children[middle + i];
705
    }
706
707
    sibRef.children[sibRef.numKeys] = nodeRef.children[nodeRef.numKeys];
    nodeRef.numKeys = middle - 1;
708

709
710
711
712
    auto &splitRef = *splitInfo;
    splitRef.key = nodeRef.keys[middle - 1];
    splitRef.leftChild = node;
    splitRef.rightChild = sibling;
713
714
715
  }

  /**
716
717
718
   * Traverse the tree starting at the root until the leaf node is found that could contain the
   * given @key. Note, that always a leaf node is returned even if the key doesn't exist on this
   * node.
719
720
721
722
   *
   * @param key the key we are looking for
   * @return the leaf node that would store the key
   */
723
  pptr<LeafNode> findLeafNode(const KeyType &key) const {
724
725
    auto node = rootNode;
    auto d = depth;
726
    while (d-- > 0) {
727
728
729
730
731
732
733
      auto n = node.branch;
      auto pos = lookupPositionInBranchNode(n, key);
      node = n->children[pos];
    }
    return node.leaf;
  }
  /**
734
   * Lookup the search key @c key in the given leaf node and return the position.
735
   * If the search key was not found, then @c M is returned.
736
737
738
   *
   * @param node the leaf node where we search
   * @param key the search key
739
   * @return the position of the key  (or @c M if not found)
740
   */
741
742
  auto lookupPositionInLeafNode(const pptr<LeafNode> &node, const KeyType &key) const {
    auto pos = 0u;
Philipp Götze's avatar
Philipp Götze committed
743
    const auto &nodeRef = *node;
744
    const auto hash = fpHash(key);
Philipp Götze's avatar
Philipp Götze committed
745
    const auto &keys = nodeRef.keys.get_ro();
746
747
748
749
750
    const auto &bits = nodeRef.bits.get_ro();
    const auto &hashs = nodeRef.fp.get_ro();

    for (; pos < M; ++pos)
      if (hashs[pos] == hash && bits.test(pos) && keys[pos] == key) break;
751
752
753
754
    return pos;
  }

  /**
755
756
757
   * Lookup the search key @c key in the given branch node and return the position which is the
   * position in the list of keys + 1. in this way, the position corresponds to the position of the
   * child pointer in the array @children.
758
759
760
   * If the search key is less than the smallest key, then @c 0 is returned.
   * If the key is greater than the largest key, then @c numKeys is returned.
   *
761
   * @param node the branch node where we search
762
763
764
   * @param key the search key
   * @return the position of the key + 1 (or 0 or @c numKey)
   */
765
766
  auto lookupPositionInBranchNode(const BranchNode * const node, const KeyType &key) const {
    const auto num = node->numKeys;
767
    const auto &keys = node->keys;
768
769
770
    // auto pos = 0u;
    // for (; pos < num && keys[pos] <= key; ++pos);
    // return pos;
771
    return binarySearch<true>(keys, 0, num - 1, key);
772
773
774
775
776
777
778
779
780
  }

  /**
   * Delete the element with the given key from the given leaf node.
   *
   * @param node the leaf node from which the element is deleted
   * @param key the key of the element to be deleted
   * @return true of the element was deleted
   */
781
  bool eraseFromLeafNode(const pptr<LeafNode> &node, const KeyType &key) {
782
    auto pos = lookupPositionInLeafNode(node, key);
783
784
785
786
787
788
789
790
791
792
793
794
795
    return eraseFromLeafNodeAtPosition(node, pos, key);
  }

  /**
   * Delete the element with the given position and key from the given leaf node.
   *
   * @param node the leaf node from which the element is deleted
   * @param pos the position of the key in the node
   * @param key the key of the element to be deleted
   * @return true of the element was deleted
   */
  bool eraseFromLeafNodeAtPosition(const pptr<LeafNode> &node, const unsigned int pos,
                                   const KeyType &key) {
796
    if (pos < M) {
797
798
      node->bits.get_rw().reset(pos);
      PersistEmulation::writeBytes<1>();
799
      return true;
800
    }
801
    return false;
802
803
804
  }

  /**
805
806
   * Delete an entry from the tree by recursively going down to the leaf level and handling the
   * underflows.
807
808
809
810
811
812
   *
   * @param node the current branch node
   * @param d the current depth of the traversal
   * @param key the key to be deleted
   * @return true if the entry was deleted
   */
813
814
  bool eraseFromBranchNode(BranchNode * const node, unsigned int d, const KeyType &key) {
    auto &nodeRef = *node;
815
816
    assert(d >= 1);
    bool deleted = false;
817
    /// try to find the branch
818
    auto pos = lookupPositionInBranchNode(node, key);
819
    if (d == 1) {
820
821
      /// the next level is the leaf level
      auto leaf = nodeRef.children[pos].leaf;
822
823
      assert(leaf != nullptr);
      deleted = eraseFromLeafNode(leaf, key);
824
      constexpr auto middle = (M + 1) / 2;
825
      if (leaf->bits.get_ro().count() < middle) {
826
        /// handle underflow
827
        underflowAtLeafLevel(node, pos, leaf);
828
829
      }
    } else {
830
      auto child = nodeRef.children[pos].branch;
831
832
833
      deleted = eraseFromBranchNode(child, d - 1, key);

      pos = lookupPositionInBranchNode(node, key);
834
      constexpr auto middle = (N + 1) / 2;
835
      if (child->numKeys < middle) {
836
        /// handle underflow
837
        child = underflowAtBranchLevel(node, pos, child);
838
839
        if (d == depth && nodeRef.numKeys == 0) {
          /// special case: the root node is empty now
840
          rootNode = child;
841
          --depth;
842
843
844
845
846
847
848
        }
      }
    }
    return deleted;
  }

  /**
849
850
   * Handle the case that during a delete operation a underflow at node @c leaf occured.
   * If possible this is handled
851
852
853
854
   * (1) by rebalancing the elements among the leaf node and one of its siblings
   * (2) if not possible by merging with one of its siblings.
   *
   * @param node the parent node of the node where the underflow occured
855
   * @param pos the position of the child node @leaf in the @c children array of the branch node
856
857
   * @param leaf the node at which the underflow occured
   */
858
859
860
  void underflowAtLeafLevel(BranchNode * const node, unsigned int pos, const pptr<LeafNode> &leaf) {
    auto &nodeRef = *node;
    auto &leafRef = *leaf;
861
    auto prevNumKeys = 0u, nextNumKeys = 0u;
862
863
864
865
    assert(pos <= nodeRef.numKeys);
    constexpr auto middle = (M + 1) / 2;
    /// 1. we check whether we can rebalance with one of the siblings but only if both nodes have
    ///    the same direct parent
866
    if (pos > 0 && (prevNumKeys = leafRef.prevLeaf->bits.get_ro().count()) > middle) {
867
868
869
      /// we have a sibling at the left for rebalancing the keys
      balanceLeafNodes(leafRef.prevLeaf, leaf);

870
      nodeRef.keys[pos - 1] =
871
          leafRef.keys.get_ro()[findMinKeyPos(leafRef.keys.get_ro(), leafRef.bits.get_ro())];
872
    } else if (pos < nodeRef.numKeys && leafRef.nextLeaf->bits.get_ro().count() > middle) {
873
874
      /// we have a sibling at the right for rebalancing the keys
      balanceLeafNodes(leafRef.nextLeaf, leaf);
875
      auto &nextLeaf = *leafRef.nextLeaf;
876
      nodeRef.keys[pos] =
877
          nextLeaf.keys.get_ro()[findMinKeyPos(nextLeaf.keys.get_ro(), nextLeaf.bits.get_ro())];
878
    } else {
879
880
881
      /// 2. if this fails we have to merge two leaf nodes but only if both nodes have the same
      ///    direct parent
      pptr<LeafNode> survivor = nullptr;
882
      if (pos > 0 && leafRef.prevLeaf->bits.get_ro().count() <= middle) {
883
        survivor = mergeLeafNodes(leafRef.prevLeaf, leaf);
884
        deleteLeafNode(leaf);
885
      } else if (pos < nodeRef.numKeys && leafRef.nextLeaf->bits.get_ro().count() <= middle) {
886
887
        /// because we update the pointers in mergeLeafNodes we keep it here
        auto l = leafRef.nextLeaf;
888
889
        survivor = mergeLeafNodes(leaf, l);
        deleteLeafNode(l);
890
891
      } else assert(false); ///< this shouldn't happen?!

892
      if (nodeRef.numKeys > 1) {
893
        if (pos > 0) pos--;
894
895
896
897
        /// just remove the child node from the current branch node
        for (auto i = pos; i < nodeRef.numKeys - 1; i++) {
          nodeRef.keys[i] = nodeRef.keys[i + 1];
          nodeRef.children[i + 1] = nodeRef.children[i + 2];
898
        }
899
900
        nodeRef.children[pos] = survivor;
        --nodeRef.numKeys;
901
      } else {
902
903
        /// This is a special case that happens only if the current node is the root node.
        /// Now, we have to replace the branch root node by a leaf node.
904
905
        rootNode = survivor;
        --depth;
906
907
      }
    }
908
  }
909
910

  /**
911
912
913
   * Handle the case that during a delete operation a underflow at node @c child occured where @c
   * node is the parent node. If possible this is handled
   * (1) by rebalancing the elements among the node @c child and one of its siblings
914
915
916
   * (2) if not possible by merging with one of its siblings.
   *
   * @param node the parent node of the node where the underflow occured
917
   * @param pos the position of the child node @child in the @c children array of the branch node
918
919
920
   * @param child the node at which the underflow occured
   * @return the (possibly new) child node (in case of a merge)
   */
921
922
  BranchNode* underflowAtBranchLevel(BranchNode * const node, unsigned int pos,
                                     BranchNode * const child) {
923
924
    assert(node != nullptr);
    assert(child != nullptr);
925
926
927
928
    auto &nodeRef = *node;
    auto newChild = child;
    constexpr auto middle = (N + 1) / 2;
    /// 1. we check whether we can rebalance with one of the siblings
929
930

    if (pos > 0 &&
931
932
        nodeRef.children[pos - 1].branch->numKeys >middle) {
      /// we have a sibling at the left for rebalancing the keys
933
      auto &sibling = nodeRef.children[pos - 1].branch;
934
935
      balanceBranchNodes(sibling, child, node, pos - 1);
      return newChild;
936
937
    } else if (pos < nodeRef.numKeys && nodeRef.children[pos + 1].branch->numKeys > middle) {
      /// we have a sibling at the right for rebalancing the keys
938
      auto &sibling = nodeRef.children[pos + 1].branch;
939
940
941
      balanceBranchNodes(sibling, child, node, pos);
      return newChild;
    } else {
942
      /// 2. if this fails we have to merge two branch nodes
943
944
945
      BranchNode *lSibling = nullptr, *rSibling = nullptr;
      unsigned int prevKeys = 0, nextKeys = 0;
      if (pos > 0) {
946
        lSibling = nodeRef.children[pos - 1].branch;
947
948
        prevKeys = lSibling->numKeys;
      }
949
950
      if (pos < nodeRef.numKeys) {
        rSibling = nodeRef.children[pos + 1].branch;
951
952
953
954
955
        nextKeys = rSibling->numKeys;
      }
      BranchNode *witnessNode = nullptr;
      auto ppos = pos;
      if (prevKeys > 0) {
956
        mergeBranchNodes(lSibling, nodeRef.keys[pos - 1], child);
957
958
959
        ppos = pos - 1;
        witnessNode = child;
        newChild = lSibling;
960
        /// pos -= 1;
961
      } else if (nextKeys > 0) {
962
        mergeBranchNodes(child, nodeRef.keys[pos], rSibling);
963
964
        witnessNode = rSibling;
      } else
965
        /// shouldn't happen
966
        assert(false);
967
968
969
      /// remove nodeRef.keys.get_ro()[pos] from node
      for (auto i = ppos; i < nodeRef.numKeys - 1; i++) {
        nodeRef.keys[i] = nodeRef.keys[i + 1];
970
971
      }
      if (pos == 0) pos++;
972
973
974
      for (auto i = pos; i < nodeRef.numKeys; i++) {
        if (i + 1 <= nodeRef.numKeys) {
          nodeRef.children[i] = nodeRef.children[i + 1];
975
976
        }
      }
977
      nodeRef.numKeys--;
978
979
980
981
982
983
      deleteBranchNode(witnessNode);
      return newChild;
    }
  }

  /**
984
985
986
   * Redistribute (key, value) pairs from the leaf node @c donor to the leaf node @c receiver such
   * that both nodes have approx. the same number of elements. This method is used in case of an
   * underflow situation of a leaf node.
987
988
989
990
   *
   * @param donor the leaf node from which the elements are taken
   * @param receiver the sibling leaf node getting the elements from @c donor
   */
991
992
993
  void balanceLeafNodes(const pptr<LeafNode> &donor, const pptr<LeafNode> &receiver) {
    auto &donorRef = *donor;
    auto &receiverRef = *receiver;
994
995
    const auto dNumKeys = donorRef.bits.get_ro().count();
    const auto rNumKeys = receiverRef.bits.get_ro().count();
996
997
998
    assert(dNumKeys > rNumKeys);
    unsigned int balancedNum = (dNumKeys + rNumKeys) / 2;
    unsigned int toMove = dNumKeys - balancedNum;
999
1000
    if (toMove == 0) return;