FPTree.hpp 58.2 KB
Newer Older
1
/*
2
 * Copyright (C) 2017-2019 DBIS Group - TU Ilmenau, All Rights Reserved.
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 *
 * This file is part of our NVM-based Data Structure Repository.
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef DBIS_FPTree_hpp_
#define DBIS_FPTree_hpp_

#include <array>
24
#include <bitset>
25
26
27
28
29
30
31
32
#include <iostream>

#include <libpmemobj++/make_persistent.hpp>
#include <libpmemobj++/p.hpp>
#include <libpmemobj++/persistent_ptr.hpp>
#include <libpmemobj++/transaction.hpp>
#include <libpmemobj++/utils.hpp>

33
#include "config.h"
34
35
#include "utils/ElementOfRankK.hpp"

36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#define BRANCH_PADDING  0
#define LEAF_PADDING    0

namespace dbis::fptree {

using pmem::obj::delete_persistent;
using pmem::obj::make_persistent;
using pmem::obj::p;
using pmem::obj::persistent_ptr;
using pmem::obj::transaction;

/**
 * A persistent memory implementation of a FPTree.
 *
 * @tparam KeyType the data type of the key
 * @tparam ValueType the data type of the values associated with the key
 * @tparam N the maximum number of keys on a branch node
 * @tparam M the maximum number of keys on a leaf node
 */
template<typename KeyType, typename ValueType, int N, int M>
class FPTree {
  // we need at least two keys on a branch node to be able to split
  static_assert(N > 2, "number of branch keys has to be >2.");
  // we need at least one key on a leaf node
  static_assert(M > 0, "number of leaf keys should be >0.");

#ifndef UNIT_TESTS
  private:
#else
  public:
#endif

  // Forward declarations
  struct LeafNode;
  struct BranchNode;
  struct LowestBranchNode;


  struct LeafOrBranchNode {
    LeafOrBranchNode() : tag(BLANK) {};

    LeafOrBranchNode(persistent_ptr<LeafNode> leaf_) : tag(LEAF), leaf(leaf_) {};

    LeafOrBranchNode(BranchNode *branch_) : tag(BRANCH), branch(branch_) {};

    LeafOrBranchNode(LowestBranchNode *lowestbranch_) : tag(LOWESTBRANCH), lowestbranch(lowestbranch_) {};

    LeafOrBranchNode(const LeafOrBranchNode &other) { copy(other); };

    void copy(const LeafOrBranchNode &other) throw() {
      tag = other.tag;

      switch (tag) {
        case LEAF: {
          leaf = other.leaf;
          break;
        }
        case BRANCH: {
          branch = other.branch;
          break;
        }
        case LOWESTBRANCH: {
          lowestbranch = other.lowestbranch;
          break;
        }
        default: break;
      }
    }

    LeafOrBranchNode &operator=(LeafOrBranchNode other) {
      copy(other);
      return *this;
    }

    enum NodeType {
      BLANK, LEAF, BRANCH, LOWESTBRANCH
    } tag;
    union {
      persistent_ptr<LeafNode> leaf;
      BranchNode *branch;
      LowestBranchNode *lowestbranch;
    };
  };

120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
  /* By Herb Sutter */
  template<typename T>
  struct CacheLineStorage {
    alignas(64) T data;
    char pad[ 64 > sizeof(T) ? 64 - sizeof(T) : 0];
  };
  struct LeafSearch {
    std::bitset<M> b;            //< bitset for valid entries
    std::array<std::byte, M> fp; //< fingerprint array (n & 0xFF)

    unsigned int getFreeZero() const {
      unsigned int idx = 0;
      while (idx < M && b.test(idx)) ++idx;
      return idx;
    }
  };

137
138
139
140
141
142
143
  /**
   * A structure for representing a leaf node of a B+ tree.
   */
  struct LeafNode {
    /**
     * Constructor for creating a new empty leaf node.
     */
144
145
146
147
148
149
150
151
    LeafNode() : nextLeaf(nullptr), prevLeaf(nullptr) {}

    p<CacheLineStorage<LeafSearch>> search;//< helper structure for faster searches
    persistent_ptr<LeafNode> nextLeaf;     //< pointer to the subsequent sibling
    persistent_ptr<LeafNode> prevLeaf;     //< pointer to the preceeding sibling
    p<std::array<KeyType, M>> keys;        //< the actual keys
    p<std::array<ValueType, M>> values;    //< the actual values
    p<unsigned char> pad_[LEAF_PADDING];   //<
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
  };

  /**
   * A structure for representing an branch node (branch node) of a B+ tree.
   */
  struct BranchNode {
    /**
     * Constructor for creating a new empty branch node.
     */
    BranchNode() : numKeys(0) {}

    unsigned int numKeys;                         //< the number of currently stored keys
    std::array<KeyType, N> keys;                  //< the actual keys
    std::array<LeafOrBranchNode, N + 1> children; //< pointers to child nodes (BranchNode or LeafNode)
    unsigned char pad_[BRANCH_PADDING];           //<
  };

  /**
   * A structure for representing a branch node of the last level before leaf level.
   */
  struct LowestBranchNode {
    /**
     * Constructor for creating a new empty lowest branch node.
     */
    LowestBranchNode() : numKeys(0) {}

    unsigned int numKeys;                         //< the number of currently stored keys
    std::array<KeyType, N> keys;                  //< the actual keys
    std::array<persistent_ptr<LeafNode>,
      N + 1> children;      //< pointers to child nodes (LeafNodes) in persistent mem
    unsigned char pad_[BRANCH_PADDING];           //<
  };

  /**
   * Create a new empty leaf node
   */
  persistent_ptr<LeafNode> newLeafNode() {
    auto pop = pmem::obj::pool_by_vptr(this);
    persistent_ptr<LeafNode> newNode = nullptr;
    transaction::run(pop, [&] {
192
193
194
195
196
197
198
199
200
201
202
      newNode = make_persistent<LeafNode>();
    });
    return newNode;
  }

  persistent_ptr<LeafNode> newLeafNode(const persistent_ptr<LeafNode> &other) {
    auto pop = pmem::obj::pool_by_vptr(this);
    persistent_ptr<LeafNode> newNode = nullptr;
    transaction::run(pop, [&] {
      newNode = make_persistent<LeafNode>(*other);
    });
203
204
205
    return newNode;
  }

206

207
208
209
  void deleteLeafNode(persistent_ptr<LeafNode> node) {
    auto pop = pmem::obj::pool_by_vptr(this);
    transaction::run(pop, [&] {
210
211
      delete_persistent<LeafNode>(node);
    });
212
213
214
215
216
217
  }

  /**
   * Create a new empty branch node
   */
  BranchNode *newBranchNode() {
218
    return new BranchNode();
219
220
221
  }

  void deleteBranchNode(BranchNode *node) {
222
    delete node;
223
224
225
226
227
228
  }

  /**
   * Create a new empty lowest branch node
   */
  LowestBranchNode *newLowestBranchNode() {
229
    return new LowestBranchNode();
230
231
232
  }

  void deleteLowestBranchNode(LowestBranchNode *node) {
233
    delete node;
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
  }

  /**
   * A structure for passing information about a node split to
   * the caller.
   */
  struct SplitInfo {
    KeyType key;                 //< the key at which the node was split
    LeafOrBranchNode leftChild;  //< the resulting lhs child node
    LeafOrBranchNode rightChild; //< the resulting rhs child node
  };

  unsigned int depth;         //< the depth of the tree, i.e. the number of levels (0 => rootNode is LeafNode)

  LeafOrBranchNode rootNode;     //< pointer to the root node
  persistent_ptr<LeafNode> leafList; //<Pointer to the leaf at the most left position. Neccessary for recovery

  PROFILE_DECL

  public:
  /**
   * Typedef for a function passed to the scan method.
   */
  using ScanFunc = std::function<void(const KeyType &key, const ValueType &val)>;
  /**
   * Iterator for iterating over the leaf nodes
   */
  class iterator {
    persistent_ptr<LeafNode> currentNode;
    std::size_t currentPosition;

    public:
    iterator() : currentNode(nullptr), currentPosition(0) {}
    iterator(const LeafOrBranchNode &root, std::size_t d) {
      // traverse to left-most key
      auto node = root;
      while (d-- > 0) {
        auto n = node.branch;
        node = n->children[0];
        if (d == 0) {
          auto n =node.lowestbranch;
          node=n->children[0];
        }
      }
      currentNode = node.leaf;
      currentPosition = 0;
280
281
      // Can not overflow as there are at least M/2 entries
      while(!currentNode->search.get_ro().data.b.test(currentPosition)) ++currentPosition;
282
283
284
    }

    iterator& operator++() {
285
      if (currentPosition >= M-1) {
286
287
        currentNode = currentNode->nextLeaf;
        currentPosition = 0;
288
289
290
        if (currentNode == nullptr) return *this;
        while(!currentNode->search.get_ro().data.b.test(currentPosition)) ++currentPosition;
      } else if (!currentNode->search.get_ro().data.b.test(++currentPosition)) ++(*this);
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
      return *this;
    }
    iterator operator++(int) {iterator retval = *this; ++(*this); return retval;}

    bool operator==(iterator other) const {return (currentNode == other.currentNode &&
        currentPosition == other.currentPosition);}
    bool operator!=(iterator other) const {return !(*this == other);}

    std::pair<KeyType, ValueType> operator*() {

      return std::make_pair(currentNode->keys.get_ro()[currentPosition], currentNode->values.get_ro()[currentPosition]);
    }

    // iterator traits
    using difference_type = long;
    using value_type = std::pair<KeyType, ValueType>;
    using pointer = const std::pair<KeyType, ValueType>*;
    using reference = const std::pair<KeyType, ValueType>&;
    using iterator_category = std::forward_iterator_tag;
  };
  iterator begin() { return iterator(rootNode, depth); }
  iterator end() { return iterator(); }
  /**
   * Constructor for creating a new  tree.
   */
  FPTree() {
    rootNode = newLeafNode();
318
319
    leafList = rootNode.leaf;
    depth = 0;
320
    PROFILE_INIT
321
322
    LOG("created new FPTree with sizeof(BranchNode) = " << sizeof(BranchNode)
                            <<  ", sizeof(LeafNode) = " << sizeof(LeafNode));
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
  }

  /**
   * Destructor for the tree. Should delete all allocated nodes.
   */
  ~FPTree() {
    // Nodes are deleted automatically by releasing leafPool and branchPool.
  }

  /**
   * Insert an element (a key-value pair) into the tree. If the key @c key
   * already exists, the corresponding value is replaced by @c val.
   *
   * @param key the key of the element to be inserted
   * @param val the value that is associated with the key
   */
  void insert(const KeyType &key, const ValueType &val) {
    auto pop = pmem::obj::pool_by_vptr(this);
    transaction::run(pop, [&] {
342
			SplitInfo splitInfo;
343

344
345
      bool wasSplit = false;
      if (depth == 0) {
346
347
348
        // the root node is a leaf node
        auto n = rootNode.leaf;
        wasSplit = insertInLeafNode(n, key, val, &splitInfo);
349
      } else if (depth == 1) {
350
351
352
        //the root node is a lowest branch node
        auto n = rootNode.lowestbranch;
        wasSplit = insertInLowestBranchNode(n, key, val, &splitInfo);
353
      } else {
354
355
356
        // the root node is a branch node
        auto n = rootNode.branch;
        wasSplit = insertInBranchNode(n, depth, key, val, &splitInfo);
357
358
      }
      if (wasSplit) {
359
360
        // we had an overflow in the node and therefore the node is split
        if (depth == 0) {
361
362
363
364
365
366
367
368
        	auto root = newLowestBranchNode();

        	root->keys[0] = splitInfo.key;
        	root->children[0] = splitInfo.leftChild.leaf;
        	root->children[1] = splitInfo.rightChild.leaf;
        	root->numKeys = root->numKeys + 1;
        	rootNode.lowestbranch = root;
        	depth = depth + 1;
369
370
371
372
373
374
375
376
377
        } else {
          auto root = newBranchNode();

          root->keys[0] = splitInfo.key;
          root->children[0] = splitInfo.leftChild;
          root->children[1] = splitInfo.rightChild;
          root->numKeys = root->numKeys + 1;
          rootNode.branch = root;
          depth = depth + 1;
378
379
				}
      }
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
    });
  }

  /**
   * Find the given @c key in the  tree and if found return the
   * corresponding value.
   *
   * @param key the key we are looking for
   * @param[out] val a pointer to memory where the value is stored
   *                 if the key was found
   * @return true if the key was found, false otherwise
   */
  bool lookup(const KeyType &key, ValueType *val)  {
    assert(val != nullptr);

    bool result = false;
    auto leafNode = findLeafNode(key);
    auto pos = lookupPositionInLeafNode(leafNode, key);
398
    if (pos < M) {
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
      // we found it!
      *val = leafNode->values.get_ro()[pos];
      result = true;
    }
    return result;
  }

  /**
   * Delete the entry with the given key @c key from the tree.
   *
   * @param key the key of the entry to be deleted
   * @return true if the key was found and deleted
   */
  bool erase(const KeyType &key) {
    auto pop = pmem::obj::pool_by_vptr(this);
    bool result;
    transaction::run(pop, [&] {
        if (depth == 0) {
        // special case: the root node is a leaf node and
        // there is no need to handle underflow
        auto node = rootNode.leaf;
        assert(node != nullptr);
        result=eraseFromLeafNode(node, key);
        } else if (depth == 1) {
        // the root node is a lowest branch node
        auto node = rootNode.lowestbranch;
        assert(node != nullptr);
        result=eraseFromLowestBranchNode(node, key);
        } else {
        auto node = rootNode.branch;
        assert(node != nullptr);
        result=eraseFromBranchNode(node, depth, key);

        }
        });
    return result;
  }
  /**
   * Recover the FPTree by iterating over the LeafList and using the recoveryInsert method.
   */
  void recover(){
440
    LOG("Starting RECOVERY of FPTree");
441
    persistent_ptr<LeafNode> currentLeaf = leafList;
442
443
    if(leafList == nullptr){
      LOG("No data to recover FPTree");
444
    }
445
    if(leafList->nextLeaf == nullptr){
446
      // The index has only one node, so the leaf node becomes the root node
447
448
      rootNode = leafList;
      depth = 0;
449
    } else {
450
      rootNode = newLowestBranchNode();
451
452
453
454
455
456
457
      depth = 1;
      rootNode.lowestbranch->children[0] = currentLeaf;
      while (currentLeaf != nullptr) {
        recoveryInsert(currentLeaf);
        currentLeaf = currentLeaf->nextLeaf;
      }
    }
458
    LOG("RECOVERY Done")
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
  }

  /**
   * Print the structure and content of the tree to stdout.
   */
  void print() const {
    if (depth == 0) {
      printLeafNode(0, rootNode.leaf);
    } else if (depth == 1) {
      printLowestBranchNode(0, rootNode.lowestbranch);
    } else {
      auto n = rootNode;
      printBranchNode(0u, n.branch);
    }
  }

  PROFILE_PRINT

  /**
   * Perform a scan over all key-value pairs stored in the tree.
   * For each entry the given function @func is called.
   *
   * @param func the function called for each entry
   */
483
  void scan(ScanFunc func) const {
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
    // we traverse to the leftmost leaf node
    auto node = rootNode;
    auto d = depth;
    while ( d > 1) {
      // as long as we aren't at the leaf level we follow the path down
      node = node.branch->children.get_ro()[0];
      d--;
    }
    if (d == 1) {
      node = node.lowestbranch->children.get_ro()[0];
    }
    auto leaf = node.leaf;
    while (leaf != nullptr) {
      // for each key-value pair call func
      for (auto i = 0u; i < leaf->numKeys.get_ro(); i++) {
        const auto &key = leaf->keys.get_ro()[i];
        const auto &val = leaf->values.get_ro()[i];
        func(key, val);
      }
      // move to the next leaf node
      leaf = leaf->nextLeaf;
    }
506
  }
507
508
509
510
511
512
513
514
515

  /**
   * Perform a range scan over all elements within the range [minKey, maxKey]
   * and for each element call the given function @c func.
   *
   * @param minKey the lower boundary of the range
   * @param maxKey the upper boundary of the range
   * @param func the function called for each entry
   */
516
  void scan(const KeyType &minKey, const KeyType &maxKey, ScanFunc func) const {
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
    auto leaf = findLeafNode(minKey);

    bool higherThanMax = false;
    while (!higherThanMax && leaf != nullptr) {
      // for each key-value pair within the range call func
      for (auto i = 0u; i < M; i++) {
        if (!leaf->search.get_ro().data.b.test(i)) continue;
        auto &key = leaf->keys.get_ro()[i];
        if (key < minKey) continue;
        if (key > maxKey) { higherThanMax = true; continue; }

        auto &val = leaf->values.get_ro()[i];
        func(key, val);
      }
      // move to the next leaf node
      leaf = leaf->nextLeaf;
    }
534
  }
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554

#ifndef UNIT_TESTS
  private:
#endif

  /**
   * Insert a (key, value) pair into the corresponding leaf node. It is the
   * responsibility of the caller to make sure that the node @c node is
   * the correct node. The key is inserted at the correct position.
   *
   * @param node the node where the key-value pair is inserted.
   * @param key the key to be inserted
   * @param val the value associated with the key
   * @param splitInfo information about a possible split of the node
   */
  bool insertInLeafNode(persistent_ptr<LeafNode> node, const KeyType &key,
      const ValueType &val, SplitInfo *splitInfo) {
    bool split = false;
    auto pos = lookupPositionInLeafNode(node, key);

555
    if (pos < M) {
556
557
558
      // handle insert of duplicates
      node->values.get_rw()[pos] = val;
      return false;
559
    } 
560

561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
    pos = node->search.get_ro().data.getFreeZero();
    if (pos == M) {
      // the node is full, so we must split it
      // determine the split position by finding median in unsorted array of keys
      auto data = node->keys.get_ro();
      KeyType middle = ElementOfRankK::elementOfRankK((M + 1) / 2, data, 0, M);
      
      // copy leaf
      persistent_ptr<LeafNode> sibling = newLeafNode(node);
      for (auto i = 0u; i < M; i++) {
        KeyType currkey = node->keys.get_ro()[i];
        if (currkey > middle) 
					node->search.get_rw().data.b.reset(i);
        else
					sibling->search.get_rw().data.b.reset(i);
576
577
578
      }

      // insert the new entry
579
580
      const auto rightMin = sibling->keys.get_ro()[findMinKeyAtLeafNode(sibling)];
      if (key < rightMin)
581
        insertInLeafNodeAtPosition(node, node->search.get_ro().data.getFreeZero(), key, val);
582
      else
583
        insertInLeafNodeAtPosition(sibling, sibling->search.get_ro().data.getFreeZero(), key, val);
584
585
586
587
588
589
590
591
592
593
594
595
596

      // setup the list of leaf nodes
      if (node->nextLeaf != nullptr) {
        sibling->nextLeaf = node->nextLeaf;
        node->nextLeaf->prevLeaf = sibling;
      }
      node->nextLeaf = sibling;
      sibling->prevLeaf = node;

      // and inform the caller about the split
      split = true;
      splitInfo->leftChild = node;
      splitInfo->rightChild = sibling;
597
      splitInfo->key = rightMin;
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
    } else {
      // otherwise, we can simply insert the new entry at the given position
      insertInLeafNodeAtPosition(node, pos, key, val);
    }
    return split;
  }

  /**
   * Insert a (key, value) pair at the given position @c pos into the leaf node
   * @c node. The caller has to ensure that
   * - there is enough space to insert the element
   * - the key is inserted at the correct position according to the order of
   * keys
   *
   * @oaram node the leaf node where the element is to be inserted
   * @param pos the position in the leaf node (0 <= pos <= numKeys < M)
   * @param key the key of the element
   * @param val the actual value corresponding to the key
   */
  void insertInLeafNodeAtPosition(persistent_ptr<LeafNode> node, unsigned int pos,
      const KeyType &key, const ValueType &val) {
    assert(pos < M);
620
621
622
    // set bit and hash
    node->search.get_rw().data.b.set(pos);
    node->search.get_rw().data.fp[pos] = fpHash(key);
623

624
    // insert the new entry at the given position
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
    node->keys.get_rw()[pos] = key;
    node->values.get_rw()[pos] = val;
  }

  /**
   * Insert a (key, value) pair into the tree recursively by following the path
   * down to the leaf level starting at a lowest branch node @c at depth @c depth 1.
   *
   * @param node the starting node for the insert
   * @param key the key of the element
   * @param val the actual value corresponding to the key
   * @param splitInfo information about the split
   * @return true if a split was performed
   */
  bool insertInLowestBranchNode(LowestBranchNode *node,
      const KeyType &key, const ValueType &val,
      SplitInfo *splitInfo) {
    SplitInfo childSplitInfo;
    bool split = false, hasSplit = false;

    auto pos = lookupPositionInLowestBranchNode(node, key);

    //our children are leaf nodes
    auto child = node->children[pos];
    hasSplit = insertInLeafNode(child, key, val, &childSplitInfo);

    if (hasSplit) {
      LowestBranchNode *host = node;
      // the child node was split, thus we have to add a new entry
      // to our lowest branch node

      if (node->numKeys == N) {
        splitLowestBranchNode(node, childSplitInfo.key, splitInfo);

        host = (key < splitInfo->key ? splitInfo->leftChild
            : splitInfo->rightChild).lowestbranch;
        split = true;
        pos = lookupPositionInLowestBranchNode(host, key);
      }
      if (pos < host->numKeys) {
        // if the child isn't inserted at the rightmost position
        // then we have to make space for it

        host->children[host->numKeys + 1] = host->children[host->numKeys];
        for (auto i = host->numKeys; i > pos; i--) {

          host->children[i] = host->children[i - 1];
          host->keys[i] = host->keys[i - 1];
        }
      }
      // finally, add the new entry at the given position

      host->keys[pos] = childSplitInfo.key;
      host->children[pos] = childSplitInfo.leftChild.leaf;
      host->children[pos + 1] = childSplitInfo.rightChild.leaf;
      host->numKeys = host->numKeys + 1;
    }
    return split;
  }

  /**
   * Insert a (key, value) pair into the tree recursively by following the path
   * down to the leaf level starting at node @c node at depth @c depth.
   *
   * @param node the starting node for the insert
   * @param depth the current depth of the tree (0 == leaf level)
   * @param key the key of the element
   * @param val the actual value corresponding to the key
   * @param splitInfo information about the split
   * @return true if a split was performed
   */
  bool insertInBranchNode(BranchNode *node, unsigned int depth,
      const KeyType &key, const ValueType &val,
      SplitInfo *splitInfo) {
    SplitInfo childSplitInfo;
    bool split = false, hasSplit = false;

    auto pos = lookupPositionInBranchNode(node, key);
    if (depth - 1 == 1) {
      //case #1: our children are lowest branch nodes
      auto child = node->children[pos].lowestbranch;
      hasSplit = insertInLowestBranchNode(child, key, val, &childSplitInfo);
    } else {
      // case #2: our children are branch nodes

      auto child = node->children[pos].branch;
      hasSplit = insertInBranchNode(child, depth - 1, key, val, &childSplitInfo);
    }
    if (hasSplit) {
      BranchNode *host = node;
      // the child node was split, thus we have to add a new entry
      // to our branch node


      if (node->numKeys == N) {
        splitBranchNode(node, childSplitInfo.key, splitInfo);

        host = (key < splitInfo->key ? splitInfo->leftChild
            : splitInfo->rightChild).branch;
        split = true;
        pos = lookupPositionInBranchNode(host, key);
      }
      if (pos < host->numKeys) {
        // if the child isn't inserted at the rightmost position
        // then we have to make space for it

        host->children[host->numKeys + 1] = host->children[host->numKeys];
        for (auto i = host->numKeys; i > pos; i--) {

          host->children[i] = host->children[i - 1];
          host->keys[i] = host->keys[i - 1];
        }
      }
      // finally, add the new entry at the given position

      host->keys[pos] = childSplitInfo.key;
      host->children[pos] = childSplitInfo.leftChild;
      host->children[pos + 1] = childSplitInfo.rightChild;
      host->numKeys = host->numKeys + 1;
    }
    return split;
  }

  /**
   * Split the given lowest branch node @c node in the middle and move
   * half of the keys/children to the new sibling node.
   *
   * @param node the branch node to be split
   * @param splitKey the key on which the split of the child occured
   * @param splitInfo information about the split
   */
  void splitLowestBranchNode(LowestBranchNode *node, const KeyType &splitKey,
      SplitInfo *splitInfo) {
    // we have an overflow at the lowest branch node, let's split it
    // determine the split position
    unsigned int middle = (N + 1) / 2;
    // adjust the middle based on the key we have to insert

    if (splitKey > node->keys[middle]) middle++;
    // move all entries behind this position to a new sibling node
    LowestBranchNode *sibling = newLowestBranchNode();
    sibling->numKeys = node->numKeys - middle;
    for (auto i = 0u; i < sibling->numKeys; i++) {

      sibling->keys[i] = node->keys[middle + i];
      sibling->children[i] = node->children[middle + i];
    }

    sibling->children[sibling->numKeys] = node->children[node->numKeys];
    node->numKeys = middle - 1;
    splitInfo->key = node->keys[middle - 1];
    splitInfo->leftChild = node;
    splitInfo->rightChild = sibling;
  }

  /**
   * Split the given branch node @c node in the middle and move
   * half of the keys/children to the new sibling node.
   *
   * @param node the branch node to be split
   * @param splitKey the key on which the split of the child occured
   * @param splitInfo information about the split
   */
  void splitBranchNode(BranchNode *node, const KeyType &splitKey,
      SplitInfo *splitInfo) {
    // we have an overflow at the branch node, let's split it
    // determine the split position
    unsigned int middle = (N + 1) / 2;
    // adjust the middle based on the key we have to insert

    if (splitKey > node->keys[middle]) middle++;
    // move all entries behind this position to a new sibling node
    BranchNode *sibling = newBranchNode();
    sibling->numKeys = node->numKeys - middle;
    for (auto i = 0u; i < sibling->numKeys; i++) {

      sibling->keys[i] = node->keys[middle + i];
      sibling->children[i] = node->children[middle + i];
    }

    sibling->children[sibling->numKeys] = node->children[node->numKeys];
    node->numKeys = middle - 1;
    splitInfo->key = node->keys[middle - 1];
    splitInfo->leftChild = node;
    splitInfo->rightChild = sibling;
  }

  /**
   * Traverse the tree starting at the root until the leaf node is found that
   * could contain the given @key. Note, that always a leaf node is returned
   * even if the key doesn't exist on this node.
   *
   * @param key the key we are looking for
   * @return the leaf node that would store the key
   */
  persistent_ptr<LeafNode> findLeafNode(const KeyType &key) const {
    auto node = rootNode;

    auto d = depth;
    while (d > 1) {
      // as long as we aren't at the lowest branch level we follow the path down
      auto n = node.branch;
      auto pos = lookupPositionInBranchNode(n, key);

      node = n->children[pos];
      d--;
    }
    if(d==1){
      //after reaching the lowest leaf level, one more step is neccessary
      auto n=node.lowestbranch;
      auto pos=lookupPositionInLowestBranchNode(n, key);
      node=n->children[pos];
    }
    return node.leaf;
  }
  /**
   * Lookup the search key @c key in the given leaf node and return the
   * position.
843
   * If the search key was not found, then @c M is returned.
844
845
846
   *
   * @param node the leaf node where we search
   * @param key the search key
847
   * @return the position of the key  (or @c M if not found)
848
849
850
851
   */
  unsigned int lookupPositionInLeafNode(persistent_ptr<LeafNode> node,
      const KeyType &key) const {
    unsigned int pos = 0;
852
    const auto hash = fpHash(key);
853
854
    for (; pos < M ; pos++) {
      if(node->search.get_ro().data.b.test(pos) && 
855
         node->search.get_ro().data.fp[pos] == hash && 
856
857
858
         node->keys.get_ro()[pos] == key)
        break;
    }
859
860
861
862
    return pos;
  }

  /**
863
   * Lookup the search key @c key in the given lowest branch node and return the
864
865
866
867
868
869
870
871
872
873
874
875
876
877
   * position which is the position in the list of keys + 1. in this way, the
   * position corresponds to the position of the child pointer in the
   * array @children.
   * If the search key is less than the smallest key, then @c 0 is returned.
   * If the key is greater than the largest key, then @c numKeys is returned.
   *
   * @param node the lowest branch node where we search
   * @param key the search key
   * @return the position of the key + 1 (or 0 or @c numKey)
   */
  unsigned int lookupPositionInLowestBranchNode(LowestBranchNode *node,
      const KeyType &key) const {
    // we perform a simple linear search, perhaps we should try a binary
    // search instead?
878
879
    auto pos = 0u;
    for (; pos < node->numKeys && node->keys[pos] <= key; pos++);
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
    return pos;
  }

  /**
   * Lookup the search key @c key in the given branch node and return the
   * position which is the position in the list of keys + 1. in this way, the
   * position corresponds to the position of the child pointer in the
   * array @children.
   * If the search key is less than the smallest key, then @c 0 is returned.
   * If the key is greater than the largest key, then @c numKeys is returned.
   *
   * @param node the branch node where we search
   * @param key the search key
   * @return the position of the key + 1 (or 0 or @c numKey)
   */
  unsigned int lookupPositionInBranchNode(BranchNode *node,
      const KeyType &key) const {
    // we perform a simple linear search, perhaps we should try a binary
    // search instead?
899
900
    auto pos = 0u;
    for (; pos < node->numKeys && node->keys[pos] <= key; pos++);
901
902
903
904
905
906
907
908
909
910
911
912
    return pos;
  }

  /**
   * Delete the element with the given key from the given leaf node.
   *
   * @param node the leaf node from which the element is deleted
   * @param key the key of the element to be deleted
   * @return true of the element was deleted
   */
  bool eraseFromLeafNode(persistent_ptr <LeafNode> node, const KeyType &key) {
    auto pos = lookupPositionInLeafNode(node, key);
913
914
915
    if (pos < M) {
      node->search.get_rw().data.b.reset(pos);
      return true;
916
    }
917
    return false;
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
  }

  /**
   * Delete an entry from the tree by recursively going down to the leaf level
   * and handling the underflows.
   *
   * @param node the current lowest branch node
   * @param key the key to be deleted
   * @return true if the entry was deleted
   */
  bool eraseFromLowestBranchNode(LowestBranchNode *node, const KeyType &key) {
    bool deleted = false;
    // try to find the leaf
    auto pos = lookupPositionInLowestBranchNode(node, key);

    // the next level is the leaf level
    auto leaf = node->children[pos];
    assert(leaf != nullptr);
    deleted = eraseFromLeafNode(leaf, key);
    unsigned int middle = (M + 1) / 2;
938
    if (leaf->search.get_ro().data.b.count() < middle) {
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
      // handle underflow
      underflowAtLeafLevel(node, pos, leaf);
    }

    return deleted;
  }

  /**
   * Delete an entry from the tree by recursively going down to the leaf level
   * and handling the underflows.
   *
   * @param node the current branch node
   * @param d the current depth of the traversal
   * @param key the key to be deleted
   * @return true if the entry was deleted
   */
  bool eraseFromBranchNode(BranchNode *node, unsigned int d, const KeyType &key) {
    assert(d >= 1);
    bool deleted = false;
    // try to find the branch
    auto pos = lookupPositionInBranchNode(node, key);
    if (d == 2) {
      // the next level is the lowest branch level

      auto lowestbranch = node->children[pos].lowestbranch;
      assert(lowestbranch != nullptr);
      deleted = eraseFromLowestBranchNode(lowestbranch, key);
      unsigned int middle = (M + 1) / 2;
      if (lowestbranch->numKeys < middle) {
        // handle underflow

        lowestbranch=underflowAtLowestBranchLevel(node, pos, lowestbranch);
        if (d == depth && node->numKeys == 0) {
          // special case: the root node is empty now
          rootNode = lowestbranch;
          depth = depth - 1;
        }
      }
    } else {

      auto child = node->children[pos].branch;
      deleted = eraseFromBranchNode(child, d - 1, key);

      pos = lookupPositionInBranchNode(node, key);
      unsigned int middle = (N + 1) / 2;
      if (child->numKeys < middle) {
        // handle underflow

        child = underflowAtBranchLevel(node, pos, child);
        if (d == depth && node->numKeys == 0) {
          // special case: the root node is empty now
          rootNode = child;
          depth = depth - 1;
        }
      }
    }
    return deleted;
  }

  /**
   * Handle the case that during a delete operation a underflow at node @c leaf
   * occured. If possible this is handled
For faster browsing, not all history is shown. View entire blame