FPTree.hpp 44.3 KB
Newer Older
1
/*
2
 * Copyright (C) 2017-2019 DBIS Group - TU Ilmenau, All Rights Reserved.
3
 *
4
 * This file is part of our NVM-based Data Structures repository.
5
 *
6
7
8
 * This program is free software: you can redistribute it and/or modify it under the terms of the
 * GNU General Public License as published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
9
 *
10
11
12
 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 * See the GNU General Public License for more details.
13
 *
14
15
 * You should have received a copy of the GNU General Public License along with this program.
 * If not, see <http://www.gnu.org/licenses/>.
16
17
18
19
20
21
 */

#ifndef DBIS_FPTree_hpp_
#define DBIS_FPTree_hpp_

#include <array>
22
#include <bitset>
23
#include <cmath>
24
25
26
27
28
29
30
31
#include <iostream>

#include <libpmemobj++/make_persistent.hpp>
#include <libpmemobj++/p.hpp>
#include <libpmemobj++/persistent_ptr.hpp>
#include <libpmemobj++/transaction.hpp>
#include <libpmemobj++/utils.hpp>

32
#include "config.h"
33
34
#include "utils/BitOperations.hpp"
#include "utils/SearchFunctions.hpp"
35
#include "utils/PersistEmulation.hpp"
36

37
namespace dbis::pbptrees {
38
39
40
41
42
43

using pmem::obj::delete_persistent;
using pmem::obj::make_persistent;
using pmem::obj::p;
using pmem::obj::persistent_ptr;
using pmem::obj::transaction;
44
45
template <typename Object>
using pptr = persistent_ptr<Object>;
46
47
48
49
50
51
52
53
54
55
56

/**
 * A persistent memory implementation of a FPTree.
 *
 * @tparam KeyType the data type of the key
 * @tparam ValueType the data type of the values associated with the key
 * @tparam N the maximum number of keys on a branch node
 * @tparam M the maximum number of keys on a leaf node
 */
template<typename KeyType, typename ValueType, int N, int M>
class FPTree {
57
  /// we need at least three keys on a branch node to be able to split
58
  static_assert(N > 2, "number of branch keys has to be >2.");
59
60
  /// we need an even order for branch nodes to be able to merge
  static_assert(N % 2 == 0, "order of branch nodes must be even.");
61
  /// we need at least one key on a leaf node
62
63
64
65
66
67
68
69
  static_assert(M > 0, "number of leaf keys should be >0.");

#ifndef UNIT_TESTS
  private:
#else
  public:
#endif

70
  /// Forward declarations
71
72
73
  struct LeafNode;
  struct BranchNode;

74
75
  struct Node {
    Node() : tag(BLANK) {};
76

77
    Node(const pptr<LeafNode> &leaf_) : tag(LEAF), leaf(leaf_) {};
78

79
    Node(BranchNode *branch_) : tag(BRANCH), branch(branch_) {};
80

81
    Node(const Node &other) { copy(other); };
82

83
    void copy(const Node &other) throw() {
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
      tag = other.tag;

      switch (tag) {
        case LEAF: {
          leaf = other.leaf;
          break;
        }
        case BRANCH: {
          branch = other.branch;
          break;
        }
        default: break;
      }
    }

99
    Node &operator=(Node other) {
100
101
102
103
104
      copy(other);
      return *this;
    }

    enum NodeType {
105
      BLANK, LEAF, BRANCH
106
107
    } tag;
    union {
108
      pptr<LeafNode> leaf;
109
110
111
112
      BranchNode *branch;
    };
  };

113
  struct alignas(64) LeafSearch {
114
115
    std::bitset<M> b;          ///< bitset for valid entries
    std::array<uint8_t, M> fp; ///< fingerprint array (n & 0xFF)
116
117
  };

118
119
120
  /**
   * A structure for representing a leaf node of a B+ tree.
   */
121
122
123
124
125
  struct alignas(64) LeafNode {

    static constexpr auto NUM_KEYS = M;
    using KEY_TYPE = KeyType;

126
127
128
    /**
     * Constructor for creating a new empty leaf node.
     */
129
130
    LeafNode() : nextLeaf(nullptr), prevLeaf(nullptr) {}

131
132
133
134
135
    p<LeafSearch> search;               ///< helper structure for faster searches
    p<std::array<KeyType, M>> keys;     ///< the actual keys
    p<std::array<ValueType, M>> values; ///< the actual values
    pptr<LeafNode> nextLeaf;            ///< pointer to the subsequent sibling
    pptr<LeafNode> prevLeaf;            ///< pointer to the preceeding sibling
136
137
138
139
140
  };

  /**
   * A structure for representing an branch node (branch node) of a B+ tree.
   */
141
142
143
144
  struct alignas(64) BranchNode {

    static constexpr auto NUM_KEYS = N;

145
146
147
148
149
    /**
     * Constructor for creating a new empty branch node.
     */
    BranchNode() : numKeys(0) {}

150
151
152
    unsigned int numKeys;             ///< the number of currently stored keys
    std::array<KeyType, N> keys;      ///< the actual keys
    std::array<Node, N + 1> children; ///< pointers to child nodes (BranchNode or LeafNode)
153
154
155
156
157
  };

  /**
   * Create a new empty leaf node
   */
158
  pptr<LeafNode> newLeafNode() {
159
    auto pop = pmem::obj::pool_by_vptr(this);
160
161
    pptr<LeafNode> newNode = nullptr;
    transaction::run(pop, [&] { newNode = make_persistent<LeafNode>(); });
162
163
164
    return newNode;
  }

165
  pptr<LeafNode> newLeafNode(const pptr<LeafNode> &other) {
166
    auto pop = pmem::obj::pool_by_vptr(this);
167
168
    pptr<LeafNode> newNode = nullptr;
    transaction::run(pop, [&] { newNode = make_persistent<LeafNode>(*other); });
169
170
171
    return newNode;
  }

172
  void deleteLeafNode(const pptr<LeafNode> &node) {
173
    auto pop = pmem::obj::pool_by_vptr(this);
174
    transaction::run(pop, [&] { delete_persistent<LeafNode>(node); });
175
176
177
178
179
180
  }

  /**
   * Create a new empty branch node
   */
  BranchNode *newBranchNode() {
181
    return new BranchNode();
182
183
  }

184
  void deleteBranchNode(const BranchNode * const node) {
185
    delete node;
186
187
188
  }

  /**
189
   * A structure for passing information about a node split to the caller.
190
191
   */
  struct SplitInfo {
192
193
194
    KeyType key;     ///< the key at which the node was split
    Node leftChild;  ///< the resulting lhs child node
    Node rightChild; ///< the resulting rhs child node
195
196
  };

197
198
  unsigned int depth;      /**< the depth of the tree, i.e. the number of levels
                                (0 => rootNode is LeafNode) */
199

200
201
  Node rootNode;           ///< pointer to the root node
  pptr<LeafNode> leafList; ///< pointer to the leaf at the most left position (for recovery)
202

203
204
205
  /* -------------------------------------------------------------------------------------------- */

 public:
206
207
208
209
  /**
   * Iterator for iterating over the leaf nodes
   */
  class iterator {
210
211
    pptr<LeafNode> currentNode;
    unsigned int currentPosition;
212
213
214

    public:
    iterator() : currentNode(nullptr), currentPosition(0) {}
215
216
    iterator(const Node &root, unsigned int d) {
      /// traverse to left-most key
217
      auto node = root;
218
      while (d-- > 0) node = node.branch->children[0];
219
220
      currentNode = node.leaf;
      currentPosition = 0;
221
      const auto &nodeBits = currentNode->search.get_ro().b;
222
      /// Can not overflow as there are at least M/2 entries
223
      while(!nodeBits.test(currentPosition)) ++currentPosition;
224
225
226
    }

    iterator& operator++() {
227
      if (currentPosition >= M-1) {
228
229
        currentNode = currentNode->nextLeaf;
        currentPosition = 0;
230
        if (currentNode == nullptr) return *this;
231
232
        const auto &nodeBits = currentNode->search.get_ro().b;
        while(!nodeBits.test(currentPosition)) ++currentPosition;
233
      } else if (!currentNode->search.get_ro().b.test(++currentPosition)) ++(*this);
234
235
236
      return *this;
    }

237
238
239
240
241
242
243
244
245
246
247
    iterator operator++(int) {
      iterator retval = *this;
      ++(*this);
      return retval;
    }

    bool operator==(iterator other) const {
      return (currentNode == other.currentNode && currentPosition == other.currentPosition);
    }

    bool operator!=(iterator other) const { return !(*this == other); }
248
249

    std::pair<KeyType, ValueType> operator*() {
250
251
252
      const auto nodeRef = *currentNode;
      return std::make_pair(nodeRef.keys.get_ro()[currentPosition],
                            nodeRef.values.get_ro()[currentPosition]);
253
254
    }

255
    /// iterator traits
256
257
258
259
260
261
262
263
    using difference_type = long;
    using value_type = std::pair<KeyType, ValueType>;
    using pointer = const std::pair<KeyType, ValueType>*;
    using reference = const std::pair<KeyType, ValueType>&;
    using iterator_category = std::forward_iterator_tag;
  };
  iterator begin() { return iterator(rootNode, depth); }
  iterator end() { return iterator(); }
264
265
266
267
268
269
270
271

  /* -------------------------------------------------------------------------------------------- */

  /**
   * Typedef for a function passed to the scan method.
   */
  using ScanFunc = std::function<void(const KeyType &key, const ValueType &val)>;

272
273
274
275
276
  /**
   * Constructor for creating a new  tree.
   */
  FPTree() {
    rootNode = newLeafNode();
277
278
    leafList = rootNode.leaf;
    depth = 0;
279
280
    LOG("created new FPTree with sizeof(BranchNode) = " << sizeof(BranchNode) <<
        ", sizeof(LeafNode) = " << sizeof(LeafNode));
281
282
283
284
285
  }

  /**
   * Destructor for the tree. Should delete all allocated nodes.
   */
286
  ~FPTree() {}
287
288

  /**
289
290
   * Insert an element (a key-value pair) into the tree. If the key @c key already exists, the
   * corresponding value is replaced by @c val.
291
292
293
294
295
   *
   * @param key the key of the element to be inserted
   * @param val the value that is associated with the key
   */
  void insert(const KeyType &key, const ValueType &val) {
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
    SplitInfo splitInfo;

    bool wasSplit = false;
    if (depth == 0) {
      /// the root node is a leaf node
      auto n = rootNode.leaf;
      wasSplit = insertInLeafNode(n, key, val, &splitInfo);
    } else {
      /// the root node is a branch node
      auto n = rootNode.branch;
      wasSplit = insertInBranchNode(n, depth, key, val, &splitInfo);
    }
    if (wasSplit) {
      /// we had an overflow in the node and therefore the node is split
      const auto root = newBranchNode();
      auto &rootRef = *root;
      rootRef.keys[0] = splitInfo.key;
      rootRef.children[0] = splitInfo.leftChild;
      rootRef.children[1] = splitInfo.rightChild;
      ++rootRef.numKeys;
      rootNode.branch = root;
      ++depth;
    }
319
320
321
  }

  /**
322
   * Find the given @c key in the  tree and if found return the corresponding value.
323
324
   *
   * @param key the key we are looking for
325
   * @param[out] val a pointer to memory where the value is stored if the key was found
326
327
328
329
   * @return true if the key was found, false otherwise
   */
  bool lookup(const KeyType &key, ValueType *val)  {
    assert(val != nullptr);
330
331
    const auto leaf = findLeafNode(key);
    const auto pos = lookupPositionInLeafNode(leaf, key);
332
    if (pos < M) {
333
334
335
      /// we found it!
      *val = leaf->values.get_ro()[pos];
      return true;
336
    }
337
    return false;
338
339
340
341
342
343
344
345
346
347
  }

  /**
   * Delete the entry with the given key @c key from the tree.
   *
   * @param key the key of the entry to be deleted
   * @return true if the key was found and deleted
   */
  bool erase(const KeyType &key) {
    bool result;
348
349
350
351
352
353
354
355
356
357
    if (depth == 0) {
      /// special case: the root node is a leaf node and there is no need to handle underflow
      auto node = rootNode.leaf;
      assert(node != nullptr);
      result=eraseFromLeafNode(node, key);
    } else {
      auto node = rootNode.branch;
      assert(node != nullptr);
      result=eraseFromBranchNode(node, depth, key);
    }
358
359
    return result;
  }
360

361
362
363
  /**
   * Recover the FPTree by iterating over the LeafList and using the recoveryInsert method.
   */
364
  void recover() {
365
    LOG("Starting RECOVERY of FPTree");
366
    pptr<LeafNode> currentLeaf = leafList;
367
    if (leafList == nullptr) {
368
      LOG("No data to recover FPTree");
369
      return;
370
    }
371
372
373
374
375
376
377
378
379
380
381
382
    /* counting leafs */
    auto leafs = 0u;
    while(currentLeaf != nullptr) {
      ++leafs;
      currentLeaf = currentLeaf->nextLeaf;
    }
    float x = std::log(leafs)/std::log(N+1);
    assert(x == int(x) && "Not supported for this amount of leafs, yet");

    /* actual recovery */
    currentLeaf = leafList;
    if (leafList->nextLeaf == nullptr) {
383
      /// The index has only one node, so the leaf node becomes the root node
384
385
      rootNode = leafList;
      depth = 0;
386
    } else {
387
      rootNode = newBranchNode();
388
      depth = 1;
389
390
      rootNode.branch->children[0] = currentLeaf;
      currentLeaf = currentLeaf->nextLeaf;
391
392
393
394
395
      while (currentLeaf != nullptr) {
        recoveryInsert(currentLeaf);
        currentLeaf = currentLeaf->nextLeaf;
      }
    }
396
    LOG("RECOVERY Done")
397
398
399
400
401
402
  }

  /**
   * Print the structure and content of the tree to stdout.
   */
  void print() const {
403
404
    if (depth == 0) printLeafNode(0, rootNode.leaf);
    else printBranchNode(0u, rootNode.branch);
405
406
407
408
409
410
411
412
  }

  /**
   * Perform a scan over all key-value pairs stored in the tree.
   * For each entry the given function @func is called.
   *
   * @param func the function called for each entry
   */
413
  void scan(ScanFunc func) const {
414
    /// we traverse to the leftmost leaf node
415
416
    auto node = rootNode;
    auto d = depth;
417
    while ( d-- > 0) node = node.branch->children[0];
418
419
    auto leaf = node.leaf;
    while (leaf != nullptr) {
420
      auto &leafRef = *leaf;
421
      /// for each key-value pair call func
422
423
424
425
426
427
428
      const auto &leafBits = leafRef.search.get_ro().b;
      const auto &leafKeys = leafRef.keys.get_ro();
      const auto &leafValues = leafRef.values.get_ro();
      for (auto i = 0u; i < M; i++) {
        if (!leafBits.test(i)) continue;
        const auto &key = leafKeys[i];
        const auto &val = leafValues[i];
429
430
        func(key, val);
      }
431
432
      /// move to the next leaf node
      leaf = leafRef.nextLeaf;
433
    }
434
  }
435
436

  /**
437
438
   * Perform a range scan over all elements within the range [minKey, maxKey] and for each element
   * call the given function @c func.
439
440
441
442
443
   *
   * @param minKey the lower boundary of the range
   * @param maxKey the upper boundary of the range
   * @param func the function called for each entry
   */
444
  void scan(const KeyType &minKey, const KeyType &maxKey, ScanFunc func) const {
445
446
447
448
    auto leaf = findLeafNode(minKey);

    bool higherThanMax = false;
    while (!higherThanMax && leaf != nullptr) {
449
450
      /// for each key-value pair within the range call func
      const auto leafRef = *leaf;
451
452
453
      const auto &leafBits = leafRef.search.get_ro().b;
      const auto &leafKeys = leafRef.keys.get_ro();
      const auto &leafValues = leafRef.values.get_ro();
454
      for (auto i = 0u; i < M; i++) {
455
456
        if (!leafBits.test(i)) continue;
        auto &key = leafKeys[i];
457
458
459
        if (key < minKey) continue;
        if (key > maxKey) { higherThanMax = true; continue; }

460
        auto &val = leafValues[i];
461
462
        func(key, val);
      }
463
464
      /// move to the next leaf node
      leaf = leafRef.nextLeaf;
465
    }
466
  }
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481

#ifndef UNIT_TESTS
  private:
#endif

  /**
   * Insert a (key, value) pair into the corresponding leaf node. It is the
   * responsibility of the caller to make sure that the node @c node is
   * the correct node. The key is inserted at the correct position.
   *
   * @param node the node where the key-value pair is inserted.
   * @param key the key to be inserted
   * @param val the value associated with the key
   * @param splitInfo information about a possible split of the node
   */
482
  bool insertInLeafNode(const pptr<LeafNode> &node, const KeyType &key,
483
      const ValueType &val, SplitInfo *splitInfo) {
484
    auto &nodeRef = *node;
485
486
487
    bool split = false;
    auto pos = lookupPositionInLeafNode(node, key);

488
    if (pos < M) {
489
      /// handle insert of duplicates
490
      nodeRef.values.get_rw()[pos] = val;
491
      return false;
492
    }
493
    pos = BitOperations::getFreeZero(nodeRef.search.get_ro().b);
494
    if (pos == M) {
495
496
497
498
499
500
501
      /* split the node */
      splitLeafNode(node, splitInfo);
      auto &splitRef = *splitInfo;
      auto sibling= splitRef.rightChild.leaf;
      auto &sibRef= *sibling;

      /* insert the new entry */
502
503
504
      if (key > splitRef.key) {
        insertInLeafNodeAtPosition(sibling, BitOperations::getFreeZero(sibRef.search.get_ro().b), key, val);
      } else {
505
        if (key > findMaxKey(nodeRef.keys.get_ro(), nodeRef.search.get_ro().b)) {
506
507
508
509
510
511
512
          /// Special case: new key would be the middle, thus must be right
          insertInLeafNodeAtPosition(sibling, BitOperations::getFreeZero(sibRef.search.get_ro().b), key, val);
          splitRef.key = key;
        } else {
          insertInLeafNodeAtPosition(node, BitOperations::getFreeZero(nodeRef.search.get_ro().b), key, val);
        }
      }
513
      /* inform the caller about the split */
514
515
      split = true;
    } else {
516
      /* otherwise, we can simply insert the new entry at the given position */
517
518
519
520
521
      insertInLeafNodeAtPosition(node, pos, key, val);
    }
    return split;
  }

522
  void splitLeafNode(const pptr<LeafNode> &node, SplitInfo *splitInfo) {
523
524
525
526
      auto &nodeRef = *node;

      /* determine the split position by finding median in unsorted array of keys*/
      auto data = nodeRef.keys.get_ro();
527
528
      auto [bitmap, splitPos] = findSplitKey(data);
      const auto &splitKey = data[splitPos];
529

530
      /// copy leaf
531
      const auto sibling = newLeafNode(node);
532
      auto &sibRef = *sibling;
533
534
535
      nodeRef.search.get_rw().b = bitmap;
      sibRef.search.get_rw().b = bitmap.flip();
      PersistEmulation::writeBytes<sizeof(LeafNode) + ((2*M+7)>>3)>(); /// copy leaf + 2 bitmaps
536

537
538
539
      /// Alternative: move instead of complete copy
      /*
      const auto sibling = newLeafNode();
540
      auto &sibRef = *sibling;
541
542
543
544
545
546
      auto &sibSearch = sibRef.search.get_rw();
      auto &sibKeys = sibRef.keys.get_rw();
      auto &sibValues = sibRef.values.get_rw();
      auto &nodeSearch = nodeRef.search.get_rw();
      auto &nodeKeys = nodeRef.keys.get_ro();
      auto &nodeValues = nodeRef.values.get_ro();
547
548
      auto j = 0u;
      for(auto i = 0u; i < M; i++) {
549
550
551
552
553
554
        if(nodeKeys[i] > splitKey) {
          sibKeys[j] = nodeKeys[i];
          sibValues[j] = nodeValues[i];
          sibSearch.fp[j] = nodeSearch.fp[i];
          sibSearch.b.set(j);
          nodeSearch.b.reset(i);
555
556
557
          j++;
        }
      }
558
559
      PersistEmulation::writeBytes(j * (sizeof(KeyType) + sizeof(ValueType) + 1) + ((j*2+7)>>3)); /// j entries/hashes + j*2 bits
      */
560

561
      /// setup the list of leaf nodes
562
563
564
      if (nodeRef.nextLeaf != nullptr) {
        sibRef.nextLeaf = nodeRef.nextLeaf;
        nodeRef.nextLeaf->prevLeaf = sibling;
565
        PersistEmulation::writeBytes<16*2>();
566
567
568
      }
      nodeRef.nextLeaf = sibling;
      sibRef.prevLeaf = node;
569
      PersistEmulation::writeBytes<16*2>();
570

571
      /// set split information
572
573
574
575
576
577
      auto &splitRef = *splitInfo;
      splitRef.leftChild = node;
      splitRef.rightChild = sibling;
      splitRef.key = splitKey;
  }

578
579
580
581
582
583
584
585
586
587
588
589
  /**
   * Insert a (key, value) pair at the given position @c pos into the leaf node
   * @c node. The caller has to ensure that
   * - there is enough space to insert the element
   * - the key is inserted at the correct position according to the order of
   * keys
   *
   * @oaram node the leaf node where the element is to be inserted
   * @param pos the position in the leaf node (0 <= pos <= numKeys < M)
   * @param key the key of the element
   * @param val the actual value corresponding to the key
   */
590
  void insertInLeafNodeAtPosition(const pptr<LeafNode> &node, const unsigned int pos,
591
592
      const KeyType &key, const ValueType &val) {
    assert(pos < M);
593
    auto &nodeRef = *node;
594

595
596
597
    /// insert the new entry at the given position
    nodeRef.keys.get_rw()[pos] = key;
    nodeRef.values.get_rw()[pos] = val;
598

599
    /// set bit and hash
600
601
602
603
    auto &nodeSearch = nodeRef.search.get_rw();
    nodeSearch.b.set(pos);
    nodeSearch.fp[pos] = fpHash(key);
    PersistEmulation::writeBytes<sizeof(KeyType) + sizeof(ValueType) + 2>();
604
605
606
607
608
609
610
  }

  /**
   * Insert a (key, value) pair into the tree recursively by following the path
   * down to the leaf level starting at node @c node at depth @c depth.
   *
   * @param node the starting node for the insert
611
   * @param d the current depth of the tree (0 == leaf level)
612
613
614
615
616
   * @param key the key of the element
   * @param val the actual value corresponding to the key
   * @param splitInfo information about the split
   * @return true if a split was performed
   */
617
618
  bool insertInBranchNode(BranchNode * const node, unsigned int d, const KeyType &key,
                          const ValueType &val, SplitInfo *splitInfo) {
619
620
    SplitInfo childSplitInfo;
    bool split = false, hasSplit = false;
621
    auto &nodeRef = *node;
622
623

    auto pos = lookupPositionInBranchNode(node, key);
624
625
626
    if (d == 1) {
      /// case #1: our children are leaf nodes
      auto child = nodeRef.children[pos].leaf;
627
      hasSplit = insertInLeafNode(child, key, val, &childSplitInfo);
628
    } else {
629
630
631
      /// case #2: our children are branch nodes
      auto child = nodeRef.children[pos].branch;
      hasSplit = insertInBranchNode(child, d - 1, key, val, &childSplitInfo);
632
633
    }
    if (hasSplit) {
634
635
636
637
      auto host = node;
      /// the child node was split, thus we have to add a new entry
      /// to our branch node
      if (nodeRef.numKeys == N) {
638
        splitBranchNode(node, childSplitInfo.key, splitInfo);
639
640
        const auto &splitRef = *splitInfo;
        host = (key < splitRef.key ? splitRef.leftChild : splitRef.rightChild).branch;
641
642
643
        split = true;
        pos = lookupPositionInBranchNode(host, key);
      }
644
645
646
647
648
649
650
651
      auto &hostRef = *host;
      if (pos < hostRef.numKeys) {
        /// if the child isn't inserted at the rightmost position
        /// then we have to make space for it
        hostRef.children[hostRef.numKeys + 1] = hostRef.children[hostRef.numKeys];
        for (auto i = hostRef.numKeys; i > pos; i--) {
          hostRef.children[i] = hostRef.children[i - 1];
          hostRef.keys[i] = hostRef.keys[i - 1];
652
653
        }
      }
654
655
656
657
658
      /// finally, add the new entry at the given position
      hostRef.keys[pos] = childSplitInfo.key;
      hostRef.children[pos] = childSplitInfo.leftChild;
      hostRef.children[pos + 1] = childSplitInfo.rightChild;
      hostRef.numKeys = hostRef.numKeys + 1;
659
660
661
662
663
    }
    return split;
  }

  /**
664
665
   * Split the given branch node @c node in the middle and move half of the keys/children to the new
   * sibling node.
666
667
668
669
670
   *
   * @param node the branch node to be split
   * @param splitKey the key on which the split of the child occured
   * @param splitInfo information about the split
   */
671
672
673
674
675
676
677
678
679
680
681
682
683
684
  void splitBranchNode(BranchNode * const node, const KeyType &splitKey, SplitInfo *splitInfo) {
    auto &nodeRef = *node;
    /// determine the split position
    auto middle = (N + 1) / 2;
    /// adjust the middle based on the key we have to insert
    if (splitKey > nodeRef.keys[middle]) middle++;

    /// move all entries behind this position to a new sibling node
    const auto sibling = newBranchNode();
    auto &sibRef = *sibling;
    sibRef.numKeys = nodeRef.numKeys - middle;
    for (auto i = 0u; i < sibRef.numKeys; i++) {
      sibRef.keys[i] = nodeRef.keys[middle + i];
      sibRef.children[i] = nodeRef.children[middle + i];
685
    }
686
687
    sibRef.children[sibRef.numKeys] = nodeRef.children[nodeRef.numKeys];
    nodeRef.numKeys = middle - 1;
688

689
690
691
692
    auto &splitRef = *splitInfo;
    splitRef.key = nodeRef.keys[middle - 1];
    splitRef.leftChild = node;
    splitRef.rightChild = sibling;
693
694
695
  }

  /**
696
697
698
   * Traverse the tree starting at the root until the leaf node is found that could contain the
   * given @key. Note, that always a leaf node is returned even if the key doesn't exist on this
   * node.
699
700
701
702
   *
   * @param key the key we are looking for
   * @return the leaf node that would store the key
   */
703
  pptr<LeafNode> findLeafNode(const KeyType &key) const {
704
705
    auto node = rootNode;
    auto d = depth;
706
    while (d-- > 0) {
707
708
709
710
711
712
713
      auto n = node.branch;
      auto pos = lookupPositionInBranchNode(n, key);
      node = n->children[pos];
    }
    return node.leaf;
  }
  /**
714
   * Lookup the search key @c key in the given leaf node and return the position.
715
   * If the search key was not found, then @c M is returned.
716
717
718
   *
   * @param node the leaf node where we search
   * @param key the search key
719
   * @return the position of the key  (or @c M if not found)
720
   */
721
722
  auto lookupPositionInLeafNode(const pptr<LeafNode> &node, const KeyType &key) const {
    auto pos = 0u;
Philipp Götze's avatar
Philipp Götze committed
723
    const auto &nodeRef = *node;
724
    const auto hash = fpHash(key);
Philipp Götze's avatar
Philipp Götze committed
725
726
    const auto &keys = nodeRef.keys.get_ro();
    const auto &search = nodeRef.search.get_ro();
727
    for (; pos < M ; pos++) {
728
729
730
      if(search.fp[pos] == hash &&
         search.b.test(pos) &&
         keys[pos] == key)
731
732
        break;
    }
733
734
735
736
    return pos;
  }

  /**
737
738
739
   * Lookup the search key @c key in the given branch node and return the position which is the
   * position in the list of keys + 1. in this way, the position corresponds to the position of the
   * child pointer in the array @children.
740
741
742
   * If the search key is less than the smallest key, then @c 0 is returned.
   * If the key is greater than the largest key, then @c numKeys is returned.
   *
743
   * @param node the branch node where we search
744
745
746
   * @param key the search key
   * @return the position of the key + 1 (or 0 or @c numKey)
   */
747
748
  auto lookupPositionInBranchNode(const BranchNode * const node, const KeyType &key) const {
    const auto num = node->numKeys;
749
    const auto &keys = node->keys;
750
751
752
    //auto pos = 0u;
    //for (; pos < num && keys[pos] <= key; ++pos);
    //return pos;
753
    return binarySearch<true>(keys, 0, num - 1, key);
754
755
756
757
758
759
760
761
762
  }

  /**
   * Delete the element with the given key from the given leaf node.
   *
   * @param node the leaf node from which the element is deleted
   * @param key the key of the element to be deleted
   * @return true of the element was deleted
   */
763
  bool eraseFromLeafNode(const pptr<LeafNode> &node, const KeyType &key) {
764
    auto pos = lookupPositionInLeafNode(node, key);
765
    if (pos < M) {
766
      node->search.get_rw().b.reset(pos);
767
      return true;
768
    }
769
    return false;
770
771
772
  }

  /**
773
774
   * Delete an entry from the tree by recursively going down to the leaf level and handling the
   * underflows.
775
776
777
778
779
780
   *
   * @param node the current branch node
   * @param d the current depth of the traversal
   * @param key the key to be deleted
   * @return true if the entry was deleted
   */
781
782
  bool eraseFromBranchNode(BranchNode * const node, unsigned int d, const KeyType &key) {
    auto &nodeRef = *node;
783
784
    assert(d >= 1);
    bool deleted = false;
785
    /// try to find the branch
786
    auto pos = lookupPositionInBranchNode(node, key);
787
    if (d == 1) {
788
789
      /// the next level is the leaf level
      auto leaf = nodeRef.children[pos].leaf;
790
791
      assert(leaf != nullptr);
      deleted = eraseFromLeafNode(leaf, key);
792
      constexpr auto middle = (M + 1) / 2;
793
      if (leaf->search.get_ro().b.count() < middle) {
794
        /// handle underflow
795
        underflowAtLeafLevel(node, pos, leaf);
796
797
      }
    } else {
798
      auto child = nodeRef.children[pos].branch;
799
800
801
      deleted = eraseFromBranchNode(child, d - 1, key);

      pos = lookupPositionInBranchNode(node, key);
802
      constexpr auto middle = (N + 1) / 2;
803
      if (child->numKeys < middle) {
804
        /// handle underflow
805
        child = underflowAtBranchLevel(node, pos, child);
806
807
        if (d == depth && nodeRef.numKeys == 0) {
          /// special case: the root node is empty now
808
          rootNode = child;
809
          --depth;
810
811
812
813
814
815
816
        }
      }
    }
    return deleted;
  }

  /**
817
818
   * Handle the case that during a delete operation a underflow at node @c leaf occured.
   * If possible this is handled
819
820
821
822
   * (1) by rebalancing the elements among the leaf node and one of its siblings
   * (2) if not possible by merging with one of its siblings.
   *
   * @param node the parent node of the node where the underflow occured
823
   * @param pos the position of the child node @leaf in the @c children array of the branch node
824
825
   * @param leaf the node at which the underflow occured
   */
826
827
828
  void underflowAtLeafLevel(BranchNode * const node, unsigned int pos, const pptr<LeafNode> &leaf) {
    auto &nodeRef = *node;
    auto &leafRef = *leaf;
829
    auto prevNumKeys = 0u, nextNumKeys = 0u;
830
831
832
833
    assert(pos <= nodeRef.numKeys);
    constexpr auto middle = (M + 1) / 2;
    /// 1. we check whether we can rebalance with one of the siblings but only if both nodes have
    ///    the same direct parent
834
    if (pos > 0 && (prevNumKeys = leafRef.prevLeaf->search.get_ro().b.count()) > middle) {
835
836
837
      /// we have a sibling at the left for rebalancing the keys
      balanceLeafNodes(leafRef.prevLeaf, leaf);

838
839
      nodeRef.keys[pos - 1] = leafRef.keys.get_ro()[findMinKey(leafRef.keys.get_ro(),
                                                               leafRef.search.get_ro().b)];
840
841
842
    } else if (pos < nodeRef.numKeys && leafRef.nextLeaf->search.get_ro().b.count() > middle) {
      /// we have a sibling at the right for rebalancing the keys
      balanceLeafNodes(leafRef.nextLeaf, leaf);
843
      auto &nextLeaf = *leafRef.nextLeaf;
844
845
      nodeRef.keys[pos] = nextLeaf.keys.get_ro()[findMinKey(nextLeaf.keys.get_ro(),
                                                            nextLeaf.search.get_ro().b)];
846
    } else {
847
848
849
850
851
      /// 2. if this fails we have to merge two leaf nodes but only if both nodes have the same
      ///    direct parent
      pptr<LeafNode> survivor = nullptr;
      if (pos > 0 && leafRef.prevLeaf->search.get_ro().b.count() <= middle) {
        survivor = mergeLeafNodes(leafRef.prevLeaf, leaf);
852
        deleteLeafNode(leaf);
853
854
855
      } else if (pos < nodeRef.numKeys && leafRef.nextLeaf->search.get_ro().b.count() <= middle) {
        /// because we update the pointers in mergeLeafNodes we keep it here
        auto l = leafRef.nextLeaf;
856
857
        survivor = mergeLeafNodes(leaf, l);
        deleteLeafNode(l);
858
859
      } else assert(false); ///< this shouldn't happen?!

860
      if (nodeRef.numKeys > 1) {
861
        if (pos > 0) pos--;
862
863
864
865
        /// just remove the child node from the current branch node
        for (auto i = pos; i < nodeRef.numKeys - 1; i++) {
          nodeRef.keys[i] = nodeRef.keys[i + 1];
          nodeRef.children[i + 1] = nodeRef.children[i + 2];
866
        }
867
868
        nodeRef.children[pos] = survivor;
        --nodeRef.numKeys;
869
      } else {
870
871
        /// This is a special case that happens only if the current node is the root node.
        /// Now, we have to replace the branch root node by a leaf node.
872
873
        rootNode = survivor;
        --depth;
874
875
      }
    }
876
  }
877
878

  /**
879
880
881
   * Handle the case that during a delete operation a underflow at node @c child occured where @c
   * node is the parent node. If possible this is handled
   * (1) by rebalancing the elements among the node @c child and one of its siblings
882
883
884
   * (2) if not possible by merging with one of its siblings.
   *
   * @param node the parent node of the node where the underflow occured
885
   * @param pos the position of the child node @child in the @c children array of the branch node
886
887
888
   * @param child the node at which the underflow occured
   * @return the (possibly new) child node (in case of a merge)
   */
889
890
  BranchNode* underflowAtBranchLevel(BranchNode * const node, unsigned int pos,
                                     BranchNode * const child) {
891
892
    assert(node != nullptr);
    assert(child != nullptr);
893
894
895
896
    auto &nodeRef = *node;
    auto newChild = child;
    constexpr auto middle = (N + 1) / 2;
    /// 1. we check whether we can rebalance with one of the siblings
897
898

    if (pos > 0 &&
899
900
        nodeRef.children[pos - 1].branch->numKeys >middle) {
      /// we have a sibling at the left for rebalancing the keys
901
      auto &sibling = nodeRef.children[pos - 1].branch;
902
903
      balanceBranchNodes(sibling, child, node, pos - 1);
      return newChild;
904
905
    } else if (pos < nodeRef.numKeys && nodeRef.children[pos + 1].branch->numKeys > middle) {
      /// we have a sibling at the right for rebalancing the keys
906
      auto &sibling = nodeRef.children[pos + 1].branch;
907
908
909
      balanceBranchNodes(sibling, child, node, pos);
      return newChild;
    } else {
910
      /// 2. if this fails we have to merge two branch nodes
911
912
913
      BranchNode *lSibling = nullptr, *rSibling = nullptr;
      unsigned int prevKeys = 0, nextKeys = 0;
      if (pos > 0) {
914
        lSibling = nodeRef.children[pos - 1].branch;
915
916
        prevKeys = lSibling->numKeys;
      }
917
918
      if (pos < nodeRef.numKeys) {
        rSibling = nodeRef.children[pos + 1].branch;
919
920
921
922
923
        nextKeys = rSibling->numKeys;
      }
      BranchNode *witnessNode = nullptr;
      auto ppos = pos;
      if (prevKeys > 0) {
924
        mergeBranchNodes(lSibling, nodeRef.keys[pos - 1], child);
925
926
927
        ppos = pos - 1;
        witnessNode = child;
        newChild = lSibling;
928
        /// pos -= 1;
929
      } else if (nextKeys > 0) {
930
        mergeBranchNodes(child, nodeRef.keys[pos], rSibling);
931
932
        witnessNode = rSibling;
      } else
933
        /// shouldn't happen
934
        assert(false);
935
936
937
      /// remove nodeRef.keys.get_ro()[pos] from node
      for (auto i = ppos; i < nodeRef.numKeys - 1; i++) {
        nodeRef.keys[i] = nodeRef.keys[i + 1];
938
939
      }
      if (pos == 0) pos++;
940
941
942
      for (auto i = pos; i < nodeRef.numKeys; i++) {
        if (i + 1 <= nodeRef.numKeys) {
          nodeRef.children[i] = nodeRef.children[i + 1];
943
944
        }
      }
945
      nodeRef.numKeys--;
946
947
948
949
950
951
      deleteBranchNode(witnessNode);
      return newChild;
    }
  }

  /**
952
953
954
   * Redistribute (key, value) pairs from the leaf node @c donor to the leaf node @c receiver such
   * that both nodes have approx. the same number of elements. This method is used in case of an
   * underflow situation of a leaf node.
955
956
957
958
   *
   * @param donor the leaf node from which the elements are taken
   * @param receiver the sibling leaf node getting the elements from @c donor
   */
959
960
961
962
963
  void balanceLeafNodes(const pptr<LeafNode> &donor, const pptr<LeafNode> &receiver) {
    auto &donorRef = *donor;
    auto &receiverRef = *receiver;
    const auto dNumKeys = donorRef.search.get_ro().b.count();
    const auto rNumKeys = receiverRef.search.get_ro().b.count();
964
965
966
    assert(dNumKeys > rNumKeys);
    unsigned int balancedNum = (dNumKeys + rNumKeys) / 2;
    unsigned int toMove = dNumKeys - balancedNum;
967
968
    if (toMove == 0) return;

969
970
971
972
973
974
975
976
    auto &receiverKeys = receiverRef.keys.get_rw();
    auto &receiverValues = receiverRef.values.get_rw();
    auto &receiverSearch = receiverRef.search.get_rw();
    const auto &donorKeys = donorRef.keys.get_ro();
    const auto &donorValues = donorRef.values.get_ro();
    auto &donorSearch = donorRef.search.get_rw();

    if (donorKeys[0] < receiverKeys[0]) {
977
      /// move to a node with larger keys
978
      for (auto i = 0u; i < toMove; ++i) {
979
        const auto max = findMaxKey(donorKeys, donorSearch.b);
980
981
982
983
984
985
        const auto pos = BitOperations::getFreeZero(receiverSearch.b);
        receiverSearch.b.set(pos);
        receiverSearch.fp[pos] = fpHash(donorKeys[max]);
        receiverKeys[pos] = donorKeys[max];
        receiverValues[pos] = donorValues[max];
        donorSearch.b.reset(max);
986
987
      }
    } else {
988
      /// move to a node with smaller keys
989
      for (auto i = 0u; i < toMove; ++i) {
990
        const auto min = findMinKey(donorKeys, donorSearch.b);
991
992
993
994
995
996
        const auto pos = BitOperations::getFreeZero(receiverSearch.b);
        receiverSearch.b.set(pos);
        receiverSearch.fp[pos] = fpHash(donorKeys[min]);
        receiverKeys[pos] = donorKeys[min];
        receiverValues[pos] = donorValues[min];
        donorSearch.b.reset(min);
997
998
999
1000
      }
    }
  }

For faster browsing, not all history is shown. View entire blame