FPTree.hpp 44.8 KB
Newer Older
1
/*
2
 * Copyright (C) 2017-2019 DBIS Group - TU Ilmenau, All Rights Reserved.
3
 *
4
 * This file is part of our NVM-based Data Structures repository.
5
 *
6
7
8
 * This program is free software: you can redistribute it and/or modify it under the terms of the
 * GNU General Public License as published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
9
 *
10
11
12
 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 * See the GNU General Public License for more details.
13
 *
14
15
 * You should have received a copy of the GNU General Public License along with this program.
 * If not, see <http://www.gnu.org/licenses/>.
16
17
18
19
20
21
 */

#ifndef DBIS_FPTree_hpp_
#define DBIS_FPTree_hpp_

#include <array>
22
#include <bitset>
23
#include <cmath>
24
25
26
27
28
29
30
31
#include <iostream>

#include <libpmemobj++/make_persistent.hpp>
#include <libpmemobj++/p.hpp>
#include <libpmemobj++/persistent_ptr.hpp>
#include <libpmemobj++/transaction.hpp>
#include <libpmemobj++/utils.hpp>

32
#include "config.h"
33
#include "utils/ElementOfRankK.hpp"
34
#include "utils/PersistEmulation.hpp"
35
36
37
38
39
40
41
42

namespace dbis::fptree {

using pmem::obj::delete_persistent;
using pmem::obj::make_persistent;
using pmem::obj::p;
using pmem::obj::persistent_ptr;
using pmem::obj::transaction;
43
44
template <typename Object>
using pptr = persistent_ptr<Object>;
45
46
47
48
49
50
51
52
53
54
55

/**
 * A persistent memory implementation of a FPTree.
 *
 * @tparam KeyType the data type of the key
 * @tparam ValueType the data type of the values associated with the key
 * @tparam N the maximum number of keys on a branch node
 * @tparam M the maximum number of keys on a leaf node
 */
template<typename KeyType, typename ValueType, int N, int M>
class FPTree {
56
  /// we need at least three keys on a branch node to be able to split
57
  static_assert(N > 2, "number of branch keys has to be >2.");
58
59
  /// we need an even order for branch nodes to be able to merge
  static_assert(N % 2 == 0, "order of branch nodes must be even.");
60
  /// we need at least one key on a leaf node
61
62
63
64
65
66
67
68
  static_assert(M > 0, "number of leaf keys should be >0.");

#ifndef UNIT_TESTS
  private:
#else
  public:
#endif

69
  /// Forward declarations
70
71
72
  struct LeafNode;
  struct BranchNode;

73
74
  struct Node {
    Node() : tag(BLANK) {};
75

76
    Node(const pptr<LeafNode> &leaf_) : tag(LEAF), leaf(leaf_) {};
77

78
    Node(BranchNode *branch_) : tag(BRANCH), branch(branch_) {};
79

80
    Node(const Node &other) { copy(other); };
81

82
    void copy(const Node &other) throw() {
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
      tag = other.tag;

      switch (tag) {
        case LEAF: {
          leaf = other.leaf;
          break;
        }
        case BRANCH: {
          branch = other.branch;
          break;
        }
        default: break;
      }
    }

98
    Node &operator=(Node other) {
99
100
101
102
103
      copy(other);
      return *this;
    }

    enum NodeType {
104
      BLANK, LEAF, BRANCH
105
106
    } tag;
    union {
107
      pptr<LeafNode> leaf;
108
109
110
111
      BranchNode *branch;
    };
  };

112
  struct alignas(64) LeafSearch {
113
114
    std::bitset<M> b;          ///< bitset for valid entries
    std::array<uint8_t, M> fp; ///< fingerprint array (n & 0xFF)
115
116
117
118
119
120
121
122

    unsigned int getFreeZero() const {
      unsigned int idx = 0;
      while (idx < M && b.test(idx)) ++idx;
      return idx;
    }
  };

123
124
125
126
127
128
129
  /**
   * A structure for representing a leaf node of a B+ tree.
   */
  struct LeafNode {
    /**
     * Constructor for creating a new empty leaf node.
     */
130
131
    LeafNode() : nextLeaf(nullptr), prevLeaf(nullptr) {}

132
133
134
135
136
    p<LeafSearch> search;               ///< helper structure for faster searches
    p<std::array<KeyType, M>> keys;     ///< the actual keys
    p<std::array<ValueType, M>> values; ///< the actual values
    pptr<LeafNode> nextLeaf;            ///< pointer to the subsequent sibling
    pptr<LeafNode> prevLeaf;            ///< pointer to the preceeding sibling
137
138
139
140
141
142
143
144
145
146
147
  };

  /**
   * A structure for representing an branch node (branch node) of a B+ tree.
   */
  struct BranchNode {
    /**
     * Constructor for creating a new empty branch node.
     */
    BranchNode() : numKeys(0) {}

148
149
150
    unsigned int numKeys;             ///< the number of currently stored keys
    std::array<KeyType, N> keys;      ///< the actual keys
    std::array<Node, N + 1> children; ///< pointers to child nodes (BranchNode or LeafNode)
151
152
153
154
155
  };

  /**
   * Create a new empty leaf node
   */
156
  pptr<LeafNode> newLeafNode() {
157
    auto pop = pmem::obj::pool_by_vptr(this);
158
159
    pptr<LeafNode> newNode = nullptr;
    transaction::run(pop, [&] { newNode = make_persistent<LeafNode>(); });
160
161
162
    return newNode;
  }

163
  pptr<LeafNode> newLeafNode(const pptr<LeafNode> &other) {
164
    auto pop = pmem::obj::pool_by_vptr(this);
165
166
    pptr<LeafNode> newNode = nullptr;
    transaction::run(pop, [&] { newNode = make_persistent<LeafNode>(*other); });
167
168
169
    return newNode;
  }

170
  void deleteLeafNode(const pptr<LeafNode> &node) {
171
    auto pop = pmem::obj::pool_by_vptr(this);
172
    transaction::run(pop, [&] { delete_persistent<LeafNode>(node); });
173
174
175
176
177
178
  }

  /**
   * Create a new empty branch node
   */
  BranchNode *newBranchNode() {
179
    return new BranchNode();
180
181
  }

182
  void deleteBranchNode(const BranchNode * const node) {
183
    delete node;
184
185
186
  }

  /**
187
   * A structure for passing information about a node split to the caller.
188
189
   */
  struct SplitInfo {
190
191
192
    KeyType key;     ///< the key at which the node was split
    Node leftChild;  ///< the resulting lhs child node
    Node rightChild; ///< the resulting rhs child node
193
194
  };

195
196
  unsigned int depth;      /**< the depth of the tree, i.e. the number of levels
                                (0 => rootNode is LeafNode) */
197

198
199
  Node rootNode;           ///< pointer to the root node
  pptr<LeafNode> leafList; ///< pointer to the leaf at the most left position (for recovery)
200

201
202
203
  /* -------------------------------------------------------------------------------------------- */

 public:
204
205
206
207
  /**
   * Iterator for iterating over the leaf nodes
   */
  class iterator {
208
209
    pptr<LeafNode> currentNode;
    unsigned int currentPosition;
210
211
212

    public:
    iterator() : currentNode(nullptr), currentPosition(0) {}
213
214
    iterator(const Node &root, unsigned int d) {
      /// traverse to left-most key
215
      auto node = root;
216
      while (d-- > 0) node = node.branch->children[0];
217
218
      currentNode = node.leaf;
      currentPosition = 0;
219
220
221
      const auto &nodeRef = *currentNode;
      /// Can not overflow as there are at least M/2 entries
      while(!nodeRef.search.get_ro().b.test(currentPosition)) ++currentPosition;
222
223
224
    }

    iterator& operator++() {
225
      if (currentPosition >= M-1) {
226
227
        currentNode = currentNode->nextLeaf;
        currentPosition = 0;
228
        if (currentNode == nullptr) return *this;
229
230
        const auto &nodeRef = *currentNode;
        while(!nodeRef.search.get_ro().b.test(currentPosition)) ++currentPosition;
231
      } else if (!currentNode->search.get_ro().b.test(++currentPosition)) ++(*this);
232
233
234
      return *this;
    }

235
236
237
238
239
240
241
242
243
244
245
    iterator operator++(int) {
      iterator retval = *this;
      ++(*this);
      return retval;
    }

    bool operator==(iterator other) const {
      return (currentNode == other.currentNode && currentPosition == other.currentPosition);
    }

    bool operator!=(iterator other) const { return !(*this == other); }
246
247

    std::pair<KeyType, ValueType> operator*() {
248
249
      return std::make_pair(currentNode->keys.get_ro()[currentPosition],
                            currentNode->values.get_ro()[currentPosition]);
250
251
    }

252
    /// iterator traits
253
254
255
256
257
258
259
260
    using difference_type = long;
    using value_type = std::pair<KeyType, ValueType>;
    using pointer = const std::pair<KeyType, ValueType>*;
    using reference = const std::pair<KeyType, ValueType>&;
    using iterator_category = std::forward_iterator_tag;
  };
  iterator begin() { return iterator(rootNode, depth); }
  iterator end() { return iterator(); }
261
262
263
264
265
266
267
268

  /* -------------------------------------------------------------------------------------------- */

  /**
   * Typedef for a function passed to the scan method.
   */
  using ScanFunc = std::function<void(const KeyType &key, const ValueType &val)>;

269
270
271
272
273
  /**
   * Constructor for creating a new  tree.
   */
  FPTree() {
    rootNode = newLeafNode();
274
275
    leafList = rootNode.leaf;
    depth = 0;
276
277
    LOG("created new FPTree with sizeof(BranchNode) = " << sizeof(BranchNode) <<
        ", sizeof(LeafNode) = " << sizeof(LeafNode));
278
279
280
281
282
  }

  /**
   * Destructor for the tree. Should delete all allocated nodes.
   */
283
  ~FPTree() {}
284
285

  /**
286
287
   * Insert an element (a key-value pair) into the tree. If the key @c key already exists, the
   * corresponding value is replaced by @c val.
288
289
290
291
292
293
294
   *
   * @param key the key of the element to be inserted
   * @param val the value that is associated with the key
   */
  void insert(const KeyType &key, const ValueType &val) {
    auto pop = pmem::obj::pool_by_vptr(this);
    transaction::run(pop, [&] {
295
      SplitInfo splitInfo;
296

297
298
      bool wasSplit = false;
      if (depth == 0) {
299
        /// the root node is a leaf node
300
301
        auto n = rootNode.leaf;
        wasSplit = insertInLeafNode(n, key, val, &splitInfo);
302
      } else {
303
        /// the root node is a branch node
304
305
        auto n = rootNode.branch;
        wasSplit = insertInBranchNode(n, depth, key, val, &splitInfo);
306
307
      }
      if (wasSplit) {
308
309
310
311
312
313
314
        /// we had an overflow in the node and therefore the node is split
        const auto root = newBranchNode();
        auto &rootRef = *root;
        rootRef.keys[0] = splitInfo.key;
        rootRef.children[0] = splitInfo.leftChild;
        rootRef.children[1] = splitInfo.rightChild;
        ++rootRef.numKeys;
315
316
        rootNode.branch = root;
        ++depth;
317
      }
318
319
320
321
    });
  }

  /**
322
   * Find the given @c key in the  tree and if found return the corresponding value.
323
324
   *
   * @param key the key we are looking for
325
   * @param[out] val a pointer to memory where the value is stored if the key was found
326
327
328
329
   * @return true if the key was found, false otherwise
   */
  bool lookup(const KeyType &key, ValueType *val)  {
    assert(val != nullptr);
330
331
    const auto leaf = findLeafNode(key);
    const auto pos = lookupPositionInLeafNode(leaf, key);
332
    if (pos < M) {
333
334
335
      /// we found it!
      *val = leaf->values.get_ro()[pos];
      return true;
336
    }
337
    return false;
338
339
340
341
342
343
344
345
346
347
348
349
  }

  /**
   * Delete the entry with the given key @c key from the tree.
   *
   * @param key the key of the entry to be deleted
   * @return true if the key was found and deleted
   */
  bool erase(const KeyType &key) {
    auto pop = pmem::obj::pool_by_vptr(this);
    bool result;
    transaction::run(pop, [&] {
350
      if (depth == 0) {
351
        /// special case: the root node is a leaf node and there is no need to handle underflow
352
353
354
        auto node = rootNode.leaf;
        assert(node != nullptr);
        result=eraseFromLeafNode(node, key);
355
      } else {
356
357
358
        auto node = rootNode.branch;
        assert(node != nullptr);
        result=eraseFromBranchNode(node, depth, key);
359
360
      }
    });
361
362
    return result;
  }
363

364
365
366
  /**
   * Recover the FPTree by iterating over the LeafList and using the recoveryInsert method.
   */
367
  void recover() {
368
    LOG("Starting RECOVERY of FPTree");
369
    pptr<LeafNode> currentLeaf = leafList;
370
    if (leafList == nullptr) {
371
      LOG("No data to recover FPTree");
372
      return;
373
    }
374
375
376
377
378
379
380
381
382
383
384
385
    /* counting leafs */
    auto leafs = 0u;
    while(currentLeaf != nullptr) {
      ++leafs;
      currentLeaf = currentLeaf->nextLeaf;
    }
    float x = std::log(leafs)/std::log(N+1);
    assert(x == int(x) && "Not supported for this amount of leafs, yet");

    /* actual recovery */
    currentLeaf = leafList;
    if (leafList->nextLeaf == nullptr) {
386
      /// The index has only one node, so the leaf node becomes the root node
387
388
      rootNode = leafList;
      depth = 0;
389
    } else {
390
      rootNode = newBranchNode();
391
      depth = 1;
392
393
      rootNode.branch->children[0] = currentLeaf;
      currentLeaf = currentLeaf->nextLeaf;
394
395
396
397
398
      while (currentLeaf != nullptr) {
        recoveryInsert(currentLeaf);
        currentLeaf = currentLeaf->nextLeaf;
      }
    }
399
    LOG("RECOVERY Done")
400
401
402
403
404
405
  }

  /**
   * Print the structure and content of the tree to stdout.
   */
  void print() const {
406
407
    if (depth == 0) printLeafNode(0, rootNode.leaf);
    else printBranchNode(0u, rootNode.branch);
408
409
410
411
412
413
414
415
  }

  /**
   * Perform a scan over all key-value pairs stored in the tree.
   * For each entry the given function @func is called.
   *
   * @param func the function called for each entry
   */
416
  void scan(ScanFunc func) const {
417
    /// we traverse to the leftmost leaf node
418
419
    auto node = rootNode;
    auto d = depth;
420
    while ( d-- > 0) node = node.branch->children[0];
421
    auto leaf = node.leaf;
422
    auto &leafRef = *leaf;
423
    while (leaf != nullptr) {
424
425
426
427
428
      /// for each key-value pair call func
      for (auto i = 0u; i < leafRef.numKeys.get_ro(); i++) {
        if (!leafRef.search.get_ro().b.test(i)) continue;
        const auto &key = leafRef.keys.get_ro()[i];
        const auto &val = leafRef.values.get_ro()[i];
429
430
        func(key, val);
      }
431
432
      /// move to the next leaf node
      leaf = leafRef.nextLeaf;
433
    }
434
  }
435
436

  /**
437
438
   * Perform a range scan over all elements within the range [minKey, maxKey] and for each element
   * call the given function @c func.
439
440
441
442
443
   *
   * @param minKey the lower boundary of the range
   * @param maxKey the upper boundary of the range
   * @param func the function called for each entry
   */
444
  void scan(const KeyType &minKey, const KeyType &maxKey, ScanFunc func) const {
445
446
447
448
    auto leaf = findLeafNode(minKey);

    bool higherThanMax = false;
    while (!higherThanMax && leaf != nullptr) {
449
450
      /// for each key-value pair within the range call func
      const auto leafRef = *leaf;
451
      for (auto i = 0u; i < M; i++) {
452
453
        if (!leafRef.search.get_ro().b.test(i)) continue;
        auto &key = leafRef.keys.get_ro()[i];
454
455
456
        if (key < minKey) continue;
        if (key > maxKey) { higherThanMax = true; continue; }

457
        auto &val = leafRef.values.get_ro()[i];
458
459
        func(key, val);
      }
460
461
      /// move to the next leaf node
      leaf = leafRef.nextLeaf;
462
    }
463
  }
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478

#ifndef UNIT_TESTS
  private:
#endif

  /**
   * Insert a (key, value) pair into the corresponding leaf node. It is the
   * responsibility of the caller to make sure that the node @c node is
   * the correct node. The key is inserted at the correct position.
   *
   * @param node the node where the key-value pair is inserted.
   * @param key the key to be inserted
   * @param val the value associated with the key
   * @param splitInfo information about a possible split of the node
   */
479
  bool insertInLeafNode(const pptr<LeafNode> &node, const KeyType &key,
480
      const ValueType &val, SplitInfo *splitInfo) {
481
    auto &nodeRef = *node;
482
483
484
    bool split = false;
    auto pos = lookupPositionInLeafNode(node, key);

485
    if (pos < M) {
486
      /// handle insert of duplicates
487
      nodeRef.values.get_rw()[pos] = val;
488
      return false;
489
490
    }
    pos = nodeRef.search.get_ro().getFreeZero();
491
    if (pos == M) {
492
493
494
495
496
497
498
499
500
      /* split the node */
      splitLeafNode(node, splitInfo);
      auto &splitRef = *splitInfo;
      auto sibling= splitRef.rightChild.leaf;
      auto &sibRef= *sibling;

      /* insert the new entry */
      if (key < splitRef.key)
        insertInLeafNodeAtPosition(node, nodeRef.search.get_ro().getFreeZero(), key, val);
501
      else
502
        insertInLeafNodeAtPosition(sibling, sibRef.search.get_ro().getFreeZero(), key, val);
503

504
505
      /* inform the caller about the split */
      splitRef.key = sibRef.keys.get_ro()[findMinKeyAtLeafNode(sibling)];
506
507
      split = true;
    } else {
508
      /* otherwise, we can simply insert the new entry at the given position */
509
510
511
512
513
      insertInLeafNodeAtPosition(node, pos, key, val);
    }
    return split;
  }

514
  void splitLeafNode(const pptr<LeafNode> &node, SplitInfo *splitInfo) {
515
516
517
518
519
520
521
      auto &nodeRef = *node;

      /* determine the split position by finding median in unsorted array of keys*/
      auto data = nodeRef.keys.get_ro();
      auto bAndKey = findSplitKey(data);
      auto &splitKey = bAndKey.second;

522
523
      /// copy leaf
      pptr<LeafNode> sibling = newLeafNode(node);
524
525
526
      auto &sibRef = *sibling;
      nodeRef.search.get_rw().b = bAndKey.first;
      sibRef.search.get_rw().b = bAndKey.first.flip();
527
      //PersistEmulation::writeBytes(sizeof(LeafNode) + ((2*M+7)>>3)); /// copy leaf + 2 bitmaps
528
529
530
531

      /* Alternative: move instead of complete copy *//*
      auto data = nodeRef.keys.get_ro();
      auto splitKey = ElementOfRankK::elementOfRankK((M+1)/2, data, 0, M);
532
      pptr<LeafNode> sibling = newLeafNode();
533
534
535
536
537
538
539
540
541
542
543
544
      auto &sibRef = *sibling;
      auto j = 0u;
      for(auto i = 0u; i < M; i++) {
        if(nodeRef.keys.get_ro()[i] > splitKey) {
          sibRef.keys.get_rw()[j] = nodeRef.keys.get_ro()[i];
          sibRef.values.get_rw()[j] = nodeRef.values.get_ro()[i];
          sibRef.search.get_rw().fp[j] = nodeRef.search.get_ro().fp[i];
          sibRef.search.get_rw().b.set(j);
          nodeRef.search.get_rw().b.reset(i);
          j++;
        }
      }
545
      PersistEmulation::writeBytes(j * (sizeof(KeyType) + sizeof(ValueType) + 1) + ((j*2+7)>>3)); /// j entries/hashes + j*2 bits*/
546

547
      /// setup the list of leaf nodes
548
549
550
551
552
553
554
555
      if (nodeRef.nextLeaf != nullptr) {
        sibRef.nextLeaf = nodeRef.nextLeaf;
        nodeRef.nextLeaf->prevLeaf = sibling;
      }
      nodeRef.nextLeaf = sibling;
      sibRef.prevLeaf = node;
      //PersistEmulation::writeBytes(16*2);

556
      /// set split information
557
558
559
560
561
562
      auto &splitRef = *splitInfo;
      splitRef.leftChild = node;
      splitRef.rightChild = sibling;
      splitRef.key = splitKey;
  }

563
564
565
566
567
568
569
570
571
572
573
574
  /**
   * Insert a (key, value) pair at the given position @c pos into the leaf node
   * @c node. The caller has to ensure that
   * - there is enough space to insert the element
   * - the key is inserted at the correct position according to the order of
   * keys
   *
   * @oaram node the leaf node where the element is to be inserted
   * @param pos the position in the leaf node (0 <= pos <= numKeys < M)
   * @param key the key of the element
   * @param val the actual value corresponding to the key
   */
575
  void insertInLeafNodeAtPosition(const pptr<LeafNode> &node, const unsigned int pos,
576
577
      const KeyType &key, const ValueType &val) {
    assert(pos < M);
578
    auto &nodeRef = *node;
579

580
581
582
    /// insert the new entry at the given position
    nodeRef.keys.get_rw()[pos] = key;
    nodeRef.values.get_rw()[pos] = val;
583
584
    //PersistEmulation::persistStall();

585
586
587
    /// set bit and hash
    nodeRef.search.get_rw().b.set(pos);
    nodeRef.search.get_rw().fp[pos] = fpHash(key);
588
589
590
    //PersistEmulation::persistStall();
    if(sizeof(LeafSearch) > 64) PersistEmulation::persistStall();
    //PersistEmulation::writeBytes(sizeof(KeyType) + sizeof(ValueType) + 2);
591
592
593
594
595
596
597
  }

  /**
   * Insert a (key, value) pair into the tree recursively by following the path
   * down to the leaf level starting at node @c node at depth @c depth.
   *
   * @param node the starting node for the insert
598
   * @param d the current depth of the tree (0 == leaf level)
599
600
601
602
603
   * @param key the key of the element
   * @param val the actual value corresponding to the key
   * @param splitInfo information about the split
   * @return true if a split was performed
   */
604
605
  bool insertInBranchNode(BranchNode * const node, unsigned int d, const KeyType &key,
                          const ValueType &val, SplitInfo *splitInfo) {
606
607
    SplitInfo childSplitInfo;
    bool split = false, hasSplit = false;
608
    auto &nodeRef = *node;
609
610

    auto pos = lookupPositionInBranchNode(node, key);
611
612
613
    if (d == 1) {
      /// case #1: our children are leaf nodes
      auto child = nodeRef.children[pos].leaf;
614
      hasSplit = insertInLeafNode(child, key, val, &childSplitInfo);
615
    } else {
616
617
618
      /// case #2: our children are branch nodes
      auto child = nodeRef.children[pos].branch;
      hasSplit = insertInBranchNode(child, d - 1, key, val, &childSplitInfo);
619
620
    }
    if (hasSplit) {
621
622
623
624
      auto host = node;
      /// the child node was split, thus we have to add a new entry
      /// to our branch node
      if (nodeRef.numKeys == N) {
625
        splitBranchNode(node, childSplitInfo.key, splitInfo);
626
627
        const auto &splitRef = *splitInfo;
        host = (key < splitRef.key ? splitRef.leftChild : splitRef.rightChild).branch;
628
629
630
        split = true;
        pos = lookupPositionInBranchNode(host, key);
      }
631
632
633
634
635
636
637
638
      auto &hostRef = *host;
      if (pos < hostRef.numKeys) {
        /// if the child isn't inserted at the rightmost position
        /// then we have to make space for it
        hostRef.children[hostRef.numKeys + 1] = hostRef.children[hostRef.numKeys];
        for (auto i = hostRef.numKeys; i > pos; i--) {
          hostRef.children[i] = hostRef.children[i - 1];
          hostRef.keys[i] = hostRef.keys[i - 1];
639
640
        }
      }
641
642
643
644
645
      /// finally, add the new entry at the given position
      hostRef.keys[pos] = childSplitInfo.key;
      hostRef.children[pos] = childSplitInfo.leftChild;
      hostRef.children[pos + 1] = childSplitInfo.rightChild;
      hostRef.numKeys = hostRef.numKeys + 1;
646
647
648
649
650
    }
    return split;
  }

  /**
651
652
   * Split the given branch node @c node in the middle and move half of the keys/children to the new
   * sibling node.
653
654
655
656
657
   *
   * @param node the branch node to be split
   * @param splitKey the key on which the split of the child occured
   * @param splitInfo information about the split
   */
658
659
660
661
662
663
664
665
666
667
668
669
670
671
  void splitBranchNode(BranchNode * const node, const KeyType &splitKey, SplitInfo *splitInfo) {
    auto &nodeRef = *node;
    /// determine the split position
    auto middle = (N + 1) / 2;
    /// adjust the middle based on the key we have to insert
    if (splitKey > nodeRef.keys[middle]) middle++;

    /// move all entries behind this position to a new sibling node
    const auto sibling = newBranchNode();
    auto &sibRef = *sibling;
    sibRef.numKeys = nodeRef.numKeys - middle;
    for (auto i = 0u; i < sibRef.numKeys; i++) {
      sibRef.keys[i] = nodeRef.keys[middle + i];
      sibRef.children[i] = nodeRef.children[middle + i];
672
    }
673
674
    sibRef.children[sibRef.numKeys] = nodeRef.children[nodeRef.numKeys];
    nodeRef.numKeys = middle - 1;
675

676
677
678
679
    auto &splitRef = *splitInfo;
    splitRef.key = nodeRef.keys[middle - 1];
    splitRef.leftChild = node;
    splitRef.rightChild = sibling;
680
681
682
  }

  /**
683
684
685
   * Traverse the tree starting at the root until the leaf node is found that could contain the
   * given @key. Note, that always a leaf node is returned even if the key doesn't exist on this
   * node.
686
687
688
689
   *
   * @param key the key we are looking for
   * @return the leaf node that would store the key
   */
690
  pptr<LeafNode> findLeafNode(const KeyType &key) const {
691
692
    auto node = rootNode;
    auto d = depth;
693
    while (d-- > 0) {
694
695
696
697
698
699
700
      auto n = node.branch;
      auto pos = lookupPositionInBranchNode(n, key);
      node = n->children[pos];
    }
    return node.leaf;
  }
  /**
701
   * Lookup the search key @c key in the given leaf node and return the position.
702
   * If the search key was not found, then @c M is returned.
703
704
705
   *
   * @param node the leaf node where we search
   * @param key the search key
706
   * @return the position of the key  (or @c M if not found)
707
   */
708
709
  auto lookupPositionInLeafNode(const pptr<LeafNode> &node, const KeyType &key) const {
    auto pos = 0u;
Philipp Götze's avatar
Philipp Götze committed
710
    const auto &nodeRef = *node;
711
    const auto hash = fpHash(key);
Philipp Götze's avatar
Philipp Götze committed
712
713
    const auto &keys = nodeRef.keys.get_ro();
    const auto &search = nodeRef.search.get_ro();
714
    for (; pos < M ; pos++) {
715
716
717
      if(search.fp[pos] == hash &&
         search.b.test(pos) &&
         keys[pos] == key)
718
719
        break;
    }
720
721
722
723
    return pos;
  }

  /**
724
725
726
   * Lookup the search key @c key in the given branch node and return the position which is the
   * position in the list of keys + 1. in this way, the position corresponds to the position of the
   * child pointer in the array @children.
727
728
729
   * If the search key is less than the smallest key, then @c 0 is returned.
   * If the key is greater than the largest key, then @c numKeys is returned.
   *
730
   * @param node the branch node where we search
731
732
733
   * @param key the search key
   * @return the position of the key + 1 (or 0 or @c numKey)
   */
734
  auto  lookupPositionInBranchNode(const BranchNode * const node, const KeyType &key) const {
735
    const auto &num = node->numKeys;
736
737
    //auto pos = 0u;
    //const auto &keys = nodeRef.keys;
738
739
    //for (; pos < num && keys[pos] <= key; pos++);
    //return pos;
740
    return binarySearch(node, 0, num - 1, key);
741
742
  }

743
  auto binarySearch(const BranchNode * const node, int l, int r, const KeyType &key) const {
744
    auto pos = 0u;
745
    const auto &keys = node->keys;
746
747
    while (l <= r) {
      pos = (l + r) / 2;
748
749
      if (keys[pos] == key) return ++pos;
      if (keys[pos] < key) l = ++pos;
750
751
      else r = pos - 1;
    }
752
753
754
755
756
757
758
759
760
761
    return pos;
  }

  /**
   * Delete the element with the given key from the given leaf node.
   *
   * @param node the leaf node from which the element is deleted
   * @param key the key of the element to be deleted
   * @return true of the element was deleted
   */
762
  bool eraseFromLeafNode(const pptr<LeafNode> &node, const KeyType &key) {
763
    auto pos = lookupPositionInLeafNode(node, key);
764
    if (pos < M) {
765
      node->search.get_rw().b.reset(pos);
766
      return true;
767
    }
768
    return false;
769
770
771
  }

  /**
772
773
   * Delete an entry from the tree by recursively going down to the leaf level and handling the
   * underflows.
774
775
776
777
778
779
   *
   * @param node the current branch node
   * @param d the current depth of the traversal
   * @param key the key to be deleted
   * @return true if the entry was deleted
   */
780
781
  bool eraseFromBranchNode(BranchNode * const node, unsigned int d, const KeyType &key) {
    auto &nodeRef = *node;
782
783
    assert(d >= 1);
    bool deleted = false;
784
    /// try to find the branch
785
    auto pos = lookupPositionInBranchNode(node, key);
786
    if (d == 1) {
787
788
      /// the next level is the leaf level
      auto leaf = nodeRef.children[pos].leaf;
789
790
      assert(leaf != nullptr);
      deleted = eraseFromLeafNode(leaf, key);
791
      constexpr auto middle = (M + 1) / 2;
792
      if (leaf->search.get_ro().b.count() < middle) {
793
        /// handle underflow
794
        underflowAtLeafLevel(node, pos, leaf);
795
796
      }
    } else {
797
      auto child = nodeRef.children[pos].branch;
798
799
800
      deleted = eraseFromBranchNode(child, d - 1, key);

      pos = lookupPositionInBranchNode(node, key);
801
      constexpr auto middle = (N + 1) / 2;
802
      if (child->numKeys < middle) {
803
        /// handle underflow
804
        child = underflowAtBranchLevel(node, pos, child);
805
806
        if (d == depth && nodeRef.numKeys == 0) {
          /// special case: the root node is empty now
807
          rootNode = child;
808
          --depth;
809
810
811
812
813
814
815
        }
      }
    }
    return deleted;
  }

  /**
816
817
   * Handle the case that during a delete operation a underflow at node @c leaf occured.
   * If possible this is handled
818
819
820
821
   * (1) by rebalancing the elements among the leaf node and one of its siblings
   * (2) if not possible by merging with one of its siblings.
   *
   * @param node the parent node of the node where the underflow occured
822
   * @param pos the position of the child node @leaf in the @c children array of the branch node
823
824
   * @param leaf the node at which the underflow occured
   */
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
  void underflowAtLeafLevel(BranchNode * const node, unsigned int pos, const pptr<LeafNode> &leaf) {
    auto &nodeRef = *node;
    auto &leafRef = *leaf;
    assert(pos <= nodeRef.numKeys);
    constexpr auto middle = (M + 1) / 2;
    /// 1. we check whether we can rebalance with one of the siblings but only if both nodes have
    ///    the same direct parent
    if (pos > 0 && leafRef.prevLeaf->search.get_ro().b.count() > middle) {
      /// we have a sibling at the left for rebalancing the keys
      balanceLeafNodes(leafRef.prevLeaf, leaf);

      nodeRef.keys[pos - 1] = leafRef.keys.get_ro()[findMinKeyAtLeafNode(leaf)];
    } else if (pos < nodeRef.numKeys && leafRef.nextLeaf->search.get_ro().b.count() > middle) {
      /// we have a sibling at the right for rebalancing the keys
      balanceLeafNodes(leafRef.nextLeaf, leaf);

      nodeRef.keys[pos] = leafRef.nextLeaf->keys.get_ro()[findMinKeyAtLeafNode(leafRef.nextLeaf)];
842
    } else {
843
844
845
846
847
      /// 2. if this fails we have to merge two leaf nodes but only if both nodes have the same
      ///    direct parent
      pptr<LeafNode> survivor = nullptr;
      if (pos > 0 && leafRef.prevLeaf->search.get_ro().b.count() <= middle) {
        survivor = mergeLeafNodes(leafRef.prevLeaf, leaf);
848
        deleteLeafNode(leaf);
849
850
851
      } else if (pos < nodeRef.numKeys && leafRef.nextLeaf->search.get_ro().b.count() <= middle) {
        /// because we update the pointers in mergeLeafNodes we keep it here
        auto l = leafRef.nextLeaf;
852
853
        survivor = mergeLeafNodes(leaf, l);
        deleteLeafNode(l);
854
      } else {
855
        /// this shouldn't happen?!
856
857
        assert(false);
      }
858
      if (nodeRef.numKeys > 1) {
859
        if (pos > 0) pos--;
860
861
862
863
        /// just remove the child node from the current branch node
        for (auto i = pos; i < nodeRef.numKeys - 1; i++) {
          nodeRef.keys[i] = nodeRef.keys[i + 1];
          nodeRef.children[i + 1] = nodeRef.children[i + 2];
864
        }
865
866
        nodeRef.children[pos] = survivor;
        --nodeRef.numKeys;
867
      } else {
868
869
        /// This is a special case that happens only if the current node is the root node.
        /// Now, we have to replace the branch root node by a leaf node.
870
871
        rootNode = survivor;
        --depth;
872
873
      }
    }
874
  }
875
876

  /**
877
878
879
   * Handle the case that during a delete operation a underflow at node @c child occured where @c
   * node is the parent node. If possible this is handled
   * (1) by rebalancing the elements among the node @c child and one of its siblings
880
881
882
   * (2) if not possible by merging with one of its siblings.
   *
   * @param node the parent node of the node where the underflow occured
883
   * @param pos the position of the child node @child in the @c children array of the branch node
884
885
886
   * @param child the node at which the underflow occured
   * @return the (possibly new) child node (in case of a merge)
   */
887
888
  BranchNode* underflowAtBranchLevel(BranchNode * const node, unsigned int pos,
                                     BranchNode * const child) {
889
890
    assert(node != nullptr);
    assert(child != nullptr);
891
892
893
894
    auto &nodeRef = *node;
    auto newChild = child;
    constexpr auto middle = (N + 1) / 2;
    /// 1. we check whether we can rebalance with one of the siblings
895
896

    if (pos > 0 &&
897
898
899
        nodeRef.children[pos - 1].branch->numKeys >middle) {
      /// we have a sibling at the left for rebalancing the keys
      auto sibling = nodeRef.children[pos - 1].branch;
900
      balanceBranchNodes(sibling, child, node, pos - 1);
901
      /// nodeRef.keys.get_rw()[pos] = child->keys.get_ro()[0];
902
      return newChild;
903
904
905
    } else if (pos < nodeRef.numKeys && nodeRef.children[pos + 1].branch->numKeys > middle) {
      /// we have a sibling at the right for rebalancing the keys
      auto sibling = nodeRef.children[pos + 1].branch;
906
907
908
      balanceBranchNodes(sibling, child, node, pos);
      return newChild;
    } else {
909
      /// 2. if this fails we have to merge two branch nodes
910
911
912
      BranchNode *lSibling = nullptr, *rSibling = nullptr;
      unsigned int prevKeys = 0, nextKeys = 0;
      if (pos > 0) {
913
        lSibling = nodeRef.children[pos - 1].branch;
914
915
        prevKeys = lSibling->numKeys;
      }
916
917
      if (pos < nodeRef.numKeys) {
        rSibling = nodeRef.children[pos + 1].branch;
918
919
920
921
922
        nextKeys = rSibling->numKeys;
      }
      BranchNode *witnessNode = nullptr;
      auto ppos = pos;
      if (prevKeys > 0) {
923
        mergeBranchNodes(lSibling, nodeRef.keys[pos - 1], child);
924
925
926
        ppos = pos - 1;
        witnessNode = child;
        newChild = lSibling;
927
        /// pos -= 1;
928
      } else if (nextKeys > 0) {
929
        mergeBranchNodes(child, nodeRef.keys[pos], rSibling);
930
931
        witnessNode = rSibling;
      } else
932
        /// shouldn't happen
933
        assert(false);
934
935
936
      /// remove nodeRef.keys.get_ro()[pos] from node
      for (auto i = ppos; i < nodeRef.numKeys - 1; i++) {
        nodeRef.keys[i] = nodeRef.keys[i + 1];
937
938
      }
      if (pos == 0) pos++;
939
940
941
      for (auto i = pos; i < nodeRef.numKeys; i++) {
        if (i + 1 <= nodeRef.numKeys) {
          nodeRef.children[i] = nodeRef.children[i + 1];
942
943
        }
      }
944
      nodeRef.numKeys--;
945
946
947
948
949
950
      deleteBranchNode(witnessNode);
      return newChild;
    }
  }

  /**
951
952
953
   * Redistribute (key, value) pairs from the leaf node @c donor to the leaf node @c receiver such
   * that both nodes have approx. the same number of elements. This method is used in case of an
   * underflow situation of a leaf node.
954
955
956
957
   *
   * @param donor the leaf node from which the elements are taken
   * @param receiver the sibling leaf node getting the elements from @c donor
   */
958
959
960
961
962
  void balanceLeafNodes(const pptr<LeafNode> &donor, const pptr<LeafNode> &receiver) {
    auto &donorRef = *donor;
    auto &receiverRef = *receiver;
    const auto dNumKeys = donorRef.search.get_ro().b.count();
    const auto rNumKeys = receiverRef.search.get_ro().b.count();
963
964
965
    assert(dNumKeys > rNumKeys);
    unsigned int balancedNum = (dNumKeys + rNumKeys) / 2;
    unsigned int toMove = dNumKeys - balancedNum;
966
967
    if (toMove == 0) return;

968
969
    if (donorRef.keys.get_ro()[0] < receiverRef.keys.get_ro()[0]) {
      /// move to a node with larger keys
970
971
      for (auto i = 0u; i < toMove; i++) {
        const auto max = findMaxKeyAtLeafNode(donor);
972
        const auto pos = receiverRef.search.get_ro().getFreeZero();
973

974
975
976
977
978
        receiverRef.search.get_rw().b.set(pos);
        receiverRef.search.get_rw().fp[pos] = fpHash(donorRef.keys.get_ro()[max]);
        receiverRef.keys.get_rw()[pos] = donorRef.keys.get_ro()[max];
        receiverRef.values.get_rw()[pos] = donorRef.values.get_ro()[max];
        donorRef.search.get_rw().b.reset(max);
979
980
      }
    } else {
981
      /// move to a node with smaller keys
982
983
      for (auto i = 0u; i < toMove; i++) {
        const auto min = findMinKeyAtLeafNode(donor);
984
        const auto pos = receiverRef.search.get_ro().getFreeZero();
985

986
987
988
989
990
        receiverRef.search.get_rw().b.set(pos);
        receiverRef.search.get_rw().fp[pos] = fpHash(donorRef.keys.get_ro()[min]);
        receiverRef.keys.get_rw()[pos] = donorRef.keys.get_ro()[min];
        receiverRef.values.get_rw()[pos] = donorRef.values.get_ro()[min];
        donorRef.search.get_rw().b.reset(min);
991
      }
992
    }
993
  }
994

995
  /**
996
   * Find position of the minimum key in unsorted leaf.
997
   *
998
999
1000
   * @param node the leaf node to find the minimum key in
   * @return position of the minimum key
   */
For faster browsing, not all history is shown. View entire blame