FPTree.hpp 44.7 KB
Newer Older
1
/*
2
 * Copyright (C) 2017-2019 DBIS Group - TU Ilmenau, All Rights Reserved.
3
 *
4
 * This file is part of our NVM-based Data Structures repository.
5
 *
6
7
8
 * This program is free software: you can redistribute it and/or modify it under the terms of the
 * GNU General Public License as published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
9
 *
10
11
12
 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 * See the GNU General Public License for more details.
13
 *
14
15
 * You should have received a copy of the GNU General Public License along with this program.
 * If not, see <http://www.gnu.org/licenses/>.
16
17
18
19
20
21
 */

#ifndef DBIS_FPTree_hpp_
#define DBIS_FPTree_hpp_

#include <array>
22
#include <bitset>
23
#include <cmath>
24
25
26
27
28
29
30
31
#include <iostream>

#include <libpmemobj++/make_persistent.hpp>
#include <libpmemobj++/p.hpp>
#include <libpmemobj++/persistent_ptr.hpp>
#include <libpmemobj++/transaction.hpp>
#include <libpmemobj++/utils.hpp>

32
#include "config.h"
33
#include "utils/ElementOfRankK.hpp"
34
#include "utils/PersistEmulation.hpp"
35
36
37
38
39
40
41
42

namespace dbis::fptree {

using pmem::obj::delete_persistent;
using pmem::obj::make_persistent;
using pmem::obj::p;
using pmem::obj::persistent_ptr;
using pmem::obj::transaction;
43
44
template <typename Object>
using pptr = persistent_ptr<Object>;
45
46
47
48
49
50
51
52
53
54
55

/**
 * A persistent memory implementation of a FPTree.
 *
 * @tparam KeyType the data type of the key
 * @tparam ValueType the data type of the values associated with the key
 * @tparam N the maximum number of keys on a branch node
 * @tparam M the maximum number of keys on a leaf node
 */
template<typename KeyType, typename ValueType, int N, int M>
class FPTree {
56
  /// we need at least two keys on a branch node to be able to split
57
  static_assert(N > 2, "number of branch keys has to be >2.");
58
  /// we need at least one key on a leaf node
59
60
61
62
63
64
65
66
  static_assert(M > 0, "number of leaf keys should be >0.");

#ifndef UNIT_TESTS
  private:
#else
  public:
#endif

67
  /// Forward declarations
68
69
70
  struct LeafNode;
  struct BranchNode;

71
72
  struct Node {
    Node() : tag(BLANK) {};
73

74
    Node(const pptr<LeafNode> &leaf_) : tag(LEAF), leaf(leaf_) {};
75

76
    Node(BranchNode *branch_) : tag(BRANCH), branch(branch_) {};
77

78
    Node(const Node &other) { copy(other); };
79

80
    void copy(const Node &other) throw() {
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
      tag = other.tag;

      switch (tag) {
        case LEAF: {
          leaf = other.leaf;
          break;
        }
        case BRANCH: {
          branch = other.branch;
          break;
        }
        default: break;
      }
    }

96
    Node &operator=(Node other) {
97
98
99
100
101
      copy(other);
      return *this;
    }

    enum NodeType {
102
      BLANK, LEAF, BRANCH
103
104
    } tag;
    union {
105
      pptr<LeafNode> leaf;
106
107
108
109
      BranchNode *branch;
    };
  };

110
  struct alignas(64) LeafSearch {
111
112
    std::bitset<M> b;          ///< bitset for valid entries
    std::array<uint8_t, M> fp; ///< fingerprint array (n & 0xFF)
113
114
115
116
117
118
119
120

    unsigned int getFreeZero() const {
      unsigned int idx = 0;
      while (idx < M && b.test(idx)) ++idx;
      return idx;
    }
  };

121
122
123
124
125
126
127
  /**
   * A structure for representing a leaf node of a B+ tree.
   */
  struct LeafNode {
    /**
     * Constructor for creating a new empty leaf node.
     */
128
129
    LeafNode() : nextLeaf(nullptr), prevLeaf(nullptr) {}

130
131
132
133
134
    p<LeafSearch> search;               ///< helper structure for faster searches
    p<std::array<KeyType, M>> keys;     ///< the actual keys
    p<std::array<ValueType, M>> values; ///< the actual values
    pptr<LeafNode> nextLeaf;            ///< pointer to the subsequent sibling
    pptr<LeafNode> prevLeaf;            ///< pointer to the preceeding sibling
135
136
137
138
139
140
141
142
143
144
145
  };

  /**
   * A structure for representing an branch node (branch node) of a B+ tree.
   */
  struct BranchNode {
    /**
     * Constructor for creating a new empty branch node.
     */
    BranchNode() : numKeys(0) {}

146
147
148
    unsigned int numKeys;             ///< the number of currently stored keys
    std::array<KeyType, N> keys;      ///< the actual keys
    std::array<Node, N + 1> children; ///< pointers to child nodes (BranchNode or LeafNode)
149
150
151
152
153
  };

  /**
   * Create a new empty leaf node
   */
154
  pptr<LeafNode> newLeafNode() {
155
    auto pop = pmem::obj::pool_by_vptr(this);
156
157
    pptr<LeafNode> newNode = nullptr;
    transaction::run(pop, [&] { newNode = make_persistent<LeafNode>(); });
158
159
160
    return newNode;
  }

161
  pptr<LeafNode> newLeafNode(const pptr<LeafNode> &other) {
162
    auto pop = pmem::obj::pool_by_vptr(this);
163
164
    pptr<LeafNode> newNode = nullptr;
    transaction::run(pop, [&] { newNode = make_persistent<LeafNode>(*other); });
165
166
167
    return newNode;
  }

168
  void deleteLeafNode(const pptr<LeafNode> &node) {
169
    auto pop = pmem::obj::pool_by_vptr(this);
170
    transaction::run(pop, [&] { delete_persistent<LeafNode>(node); });
171
172
173
174
175
176
  }

  /**
   * Create a new empty branch node
   */
  BranchNode *newBranchNode() {
177
    return new BranchNode();
178
179
  }

180
  void deleteBranchNode(const BranchNode * const node) {
181
    delete node;
182
183
184
  }

  /**
185
   * A structure for passing information about a node split to the caller.
186
187
   */
  struct SplitInfo {
188
189
190
    KeyType key;     ///< the key at which the node was split
    Node leftChild;  ///< the resulting lhs child node
    Node rightChild; ///< the resulting rhs child node
191
192
  };

193
194
  unsigned int depth;      /**< the depth of the tree, i.e. the number of levels
                                (0 => rootNode is LeafNode) */
195

196
197
  Node rootNode;           ///< pointer to the root node
  pptr<LeafNode> leafList; ///< pointer to the leaf at the most left position (for recovery)
198

199
200
201
  /* -------------------------------------------------------------------------------------------- */

 public:
202
203
204
205
  /**
   * Iterator for iterating over the leaf nodes
   */
  class iterator {
206
207
    pptr<LeafNode> currentNode;
    unsigned int currentPosition;
208
209
210

    public:
    iterator() : currentNode(nullptr), currentPosition(0) {}
211
212
    iterator(const Node &root, unsigned int d) {
      /// traverse to left-most key
213
      auto node = root;
214
      while (d-- > 0) node = node.branch->children[0];
215
216
      currentNode = node.leaf;
      currentPosition = 0;
217
218
219
      const auto &nodeRef = *currentNode;
      /// Can not overflow as there are at least M/2 entries
      while(!nodeRef.search.get_ro().b.test(currentPosition)) ++currentPosition;
220
221
222
    }

    iterator& operator++() {
223
      if (currentPosition >= M-1) {
224
225
        currentNode = currentNode->nextLeaf;
        currentPosition = 0;
226
        if (currentNode == nullptr) return *this;
227
228
        const auto &nodeRef = *currentNode;
        while(!nodeRef.search.get_ro().b.test(currentPosition)) ++currentPosition;
229
      } else if (!currentNode->search.get_ro().b.test(++currentPosition)) ++(*this);
230
231
232
      return *this;
    }

233
234
235
236
237
238
239
240
241
242
243
    iterator operator++(int) {
      iterator retval = *this;
      ++(*this);
      return retval;
    }

    bool operator==(iterator other) const {
      return (currentNode == other.currentNode && currentPosition == other.currentPosition);
    }

    bool operator!=(iterator other) const { return !(*this == other); }
244
245

    std::pair<KeyType, ValueType> operator*() {
246
247
      return std::make_pair(currentNode->keys.get_ro()[currentPosition],
                            currentNode->values.get_ro()[currentPosition]);
248
249
    }

250
    /// iterator traits
251
252
253
254
255
256
257
258
    using difference_type = long;
    using value_type = std::pair<KeyType, ValueType>;
    using pointer = const std::pair<KeyType, ValueType>*;
    using reference = const std::pair<KeyType, ValueType>&;
    using iterator_category = std::forward_iterator_tag;
  };
  iterator begin() { return iterator(rootNode, depth); }
  iterator end() { return iterator(); }
259
260
261
262
263
264
265
266

  /* -------------------------------------------------------------------------------------------- */

  /**
   * Typedef for a function passed to the scan method.
   */
  using ScanFunc = std::function<void(const KeyType &key, const ValueType &val)>;

267
268
269
270
271
  /**
   * Constructor for creating a new  tree.
   */
  FPTree() {
    rootNode = newLeafNode();
272
273
    leafList = rootNode.leaf;
    depth = 0;
274
275
    LOG("created new FPTree with sizeof(BranchNode) = " << sizeof(BranchNode) <<
        ", sizeof(LeafNode) = " << sizeof(LeafNode));
276
277
278
279
280
  }

  /**
   * Destructor for the tree. Should delete all allocated nodes.
   */
281
  ~FPTree() {}
282
283

  /**
284
285
   * Insert an element (a key-value pair) into the tree. If the key @c key already exists, the
   * corresponding value is replaced by @c val.
286
287
288
289
290
291
292
   *
   * @param key the key of the element to be inserted
   * @param val the value that is associated with the key
   */
  void insert(const KeyType &key, const ValueType &val) {
    auto pop = pmem::obj::pool_by_vptr(this);
    transaction::run(pop, [&] {
293
      SplitInfo splitInfo;
294

295
296
      bool wasSplit = false;
      if (depth == 0) {
297
        /// the root node is a leaf node
298
299
        auto n = rootNode.leaf;
        wasSplit = insertInLeafNode(n, key, val, &splitInfo);
300
      } else {
301
        /// the root node is a branch node
302
303
        auto n = rootNode.branch;
        wasSplit = insertInBranchNode(n, depth, key, val, &splitInfo);
304
305
      }
      if (wasSplit) {
306
307
308
309
310
311
312
        /// we had an overflow in the node and therefore the node is split
        const auto root = newBranchNode();
        auto &rootRef = *root;
        rootRef.keys[0] = splitInfo.key;
        rootRef.children[0] = splitInfo.leftChild;
        rootRef.children[1] = splitInfo.rightChild;
        ++rootRef.numKeys;
313
314
        rootNode.branch = root;
        ++depth;
315
      }
316
317
318
319
    });
  }

  /**
320
   * Find the given @c key in the  tree and if found return the corresponding value.
321
322
   *
   * @param key the key we are looking for
323
   * @param[out] val a pointer to memory where the value is stored if the key was found
324
325
326
327
   * @return true if the key was found, false otherwise
   */
  bool lookup(const KeyType &key, ValueType *val)  {
    assert(val != nullptr);
328
329
    const auto leaf = findLeafNode(key);
    const auto pos = lookupPositionInLeafNode(leaf, key);
330
    if (pos < M) {
331
332
333
      /// we found it!
      *val = leaf->values.get_ro()[pos];
      return true;
334
    }
335
    return false;
336
337
338
339
340
341
342
343
344
345
346
347
  }

  /**
   * Delete the entry with the given key @c key from the tree.
   *
   * @param key the key of the entry to be deleted
   * @return true if the key was found and deleted
   */
  bool erase(const KeyType &key) {
    auto pop = pmem::obj::pool_by_vptr(this);
    bool result;
    transaction::run(pop, [&] {
348
      if (depth == 0) {
349
        /// special case: the root node is a leaf node and there is no need to handle underflow
350
351
352
        auto node = rootNode.leaf;
        assert(node != nullptr);
        result=eraseFromLeafNode(node, key);
353
      } else {
354
355
356
        auto node = rootNode.branch;
        assert(node != nullptr);
        result=eraseFromBranchNode(node, depth, key);
357
358
      }
    });
359
360
    return result;
  }
361

362
363
364
  /**
   * Recover the FPTree by iterating over the LeafList and using the recoveryInsert method.
   */
365
  void recover() {
366
    LOG("Starting RECOVERY of FPTree");
367
    pptr<LeafNode> currentLeaf = leafList;
368
    if (leafList == nullptr) {
369
      LOG("No data to recover FPTree");
370
      return;
371
    }
372
373
374
375
376
377
378
379
380
381
382
383
    /* counting leafs */
    auto leafs = 0u;
    while(currentLeaf != nullptr) {
      ++leafs;
      currentLeaf = currentLeaf->nextLeaf;
    }
    float x = std::log(leafs)/std::log(N+1);
    assert(x == int(x) && "Not supported for this amount of leafs, yet");

    /* actual recovery */
    currentLeaf = leafList;
    if (leafList->nextLeaf == nullptr) {
384
      /// The index has only one node, so the leaf node becomes the root node
385
386
      rootNode = leafList;
      depth = 0;
387
    } else {
388
      rootNode = newBranchNode();
389
      depth = 1;
390
391
      rootNode.branch->children[0] = currentLeaf;
      currentLeaf = currentLeaf->nextLeaf;
392
393
394
395
396
      while (currentLeaf != nullptr) {
        recoveryInsert(currentLeaf);
        currentLeaf = currentLeaf->nextLeaf;
      }
    }
397
    LOG("RECOVERY Done")
398
399
400
401
402
403
  }

  /**
   * Print the structure and content of the tree to stdout.
   */
  void print() const {
404
405
    if (depth == 0) printLeafNode(0, rootNode.leaf);
    else printBranchNode(0u, rootNode.branch);
406
407
408
409
410
411
412
413
  }

  /**
   * Perform a scan over all key-value pairs stored in the tree.
   * For each entry the given function @func is called.
   *
   * @param func the function called for each entry
   */
414
  void scan(ScanFunc func) const {
415
    /// we traverse to the leftmost leaf node
416
417
    auto node = rootNode;
    auto d = depth;
418
    while ( d-- > 0) node = node.branch->children[0];
419
    auto leaf = node.leaf;
420
    auto &leafRef = *leaf;
421
    while (leaf != nullptr) {
422
423
424
425
426
      /// for each key-value pair call func
      for (auto i = 0u; i < leafRef.numKeys.get_ro(); i++) {
        if (!leafRef.search.get_ro().b.test(i)) continue;
        const auto &key = leafRef.keys.get_ro()[i];
        const auto &val = leafRef.values.get_ro()[i];
427
428
        func(key, val);
      }
429
430
      /// move to the next leaf node
      leaf = leafRef.nextLeaf;
431
    }
432
  }
433
434

  /**
435
436
   * Perform a range scan over all elements within the range [minKey, maxKey] and for each element
   * call the given function @c func.
437
438
439
440
441
   *
   * @param minKey the lower boundary of the range
   * @param maxKey the upper boundary of the range
   * @param func the function called for each entry
   */
442
  void scan(const KeyType &minKey, const KeyType &maxKey, ScanFunc func) const {
443
444
445
446
    auto leaf = findLeafNode(minKey);

    bool higherThanMax = false;
    while (!higherThanMax && leaf != nullptr) {
447
448
      /// for each key-value pair within the range call func
      const auto leafRef = *leaf;
449
      for (auto i = 0u; i < M; i++) {
450
451
        if (!leafRef.search.get_ro().b.test(i)) continue;
        auto &key = leafRef.keys.get_ro()[i];
452
453
454
        if (key < minKey) continue;
        if (key > maxKey) { higherThanMax = true; continue; }

455
        auto &val = leafRef.values.get_ro()[i];
456
457
        func(key, val);
      }
458
459
      /// move to the next leaf node
      leaf = leafRef.nextLeaf;
460
    }
461
  }
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476

#ifndef UNIT_TESTS
  private:
#endif

  /**
   * Insert a (key, value) pair into the corresponding leaf node. It is the
   * responsibility of the caller to make sure that the node @c node is
   * the correct node. The key is inserted at the correct position.
   *
   * @param node the node where the key-value pair is inserted.
   * @param key the key to be inserted
   * @param val the value associated with the key
   * @param splitInfo information about a possible split of the node
   */
477
  bool insertInLeafNode(const pptr<LeafNode> &node, const KeyType &key,
478
      const ValueType &val, SplitInfo *splitInfo) {
479
    auto &nodeRef = *node;
480
481
482
    bool split = false;
    auto pos = lookupPositionInLeafNode(node, key);

483
    if (pos < M) {
484
      /// handle insert of duplicates
485
      nodeRef.values.get_rw()[pos] = val;
486
      return false;
487
488
    }
    pos = nodeRef.search.get_ro().getFreeZero();
489
    if (pos == M) {
490
491
492
493
494
495
496
497
498
      /* split the node */
      splitLeafNode(node, splitInfo);
      auto &splitRef = *splitInfo;
      auto sibling= splitRef.rightChild.leaf;
      auto &sibRef= *sibling;

      /* insert the new entry */
      if (key < splitRef.key)
        insertInLeafNodeAtPosition(node, nodeRef.search.get_ro().getFreeZero(), key, val);
499
      else
500
        insertInLeafNodeAtPosition(sibling, sibRef.search.get_ro().getFreeZero(), key, val);
501

502
503
      /* inform the caller about the split */
      splitRef.key = sibRef.keys.get_ro()[findMinKeyAtLeafNode(sibling)];
504
505
      split = true;
    } else {
506
      /* otherwise, we can simply insert the new entry at the given position */
507
508
509
510
511
      insertInLeafNodeAtPosition(node, pos, key, val);
    }
    return split;
  }

512
  void splitLeafNode(const pptr<LeafNode> &node, SplitInfo *splitInfo) {
513
514
515
516
517
518
519
      auto &nodeRef = *node;

      /* determine the split position by finding median in unsorted array of keys*/
      auto data = nodeRef.keys.get_ro();
      auto bAndKey = findSplitKey(data);
      auto &splitKey = bAndKey.second;

520
521
      /// copy leaf
      pptr<LeafNode> sibling = newLeafNode(node);
522
523
524
      auto &sibRef = *sibling;
      nodeRef.search.get_rw().b = bAndKey.first;
      sibRef.search.get_rw().b = bAndKey.first.flip();
525
      //PersistEmulation::writeBytes(sizeof(LeafNode) + ((2*M+7)>>3)); /// copy leaf + 2 bitmaps
526
527
528
529

      /* Alternative: move instead of complete copy *//*
      auto data = nodeRef.keys.get_ro();
      auto splitKey = ElementOfRankK::elementOfRankK((M+1)/2, data, 0, M);
530
      pptr<LeafNode> sibling = newLeafNode();
531
532
533
534
535
536
537
538
539
540
541
542
      auto &sibRef = *sibling;
      auto j = 0u;
      for(auto i = 0u; i < M; i++) {
        if(nodeRef.keys.get_ro()[i] > splitKey) {
          sibRef.keys.get_rw()[j] = nodeRef.keys.get_ro()[i];
          sibRef.values.get_rw()[j] = nodeRef.values.get_ro()[i];
          sibRef.search.get_rw().fp[j] = nodeRef.search.get_ro().fp[i];
          sibRef.search.get_rw().b.set(j);
          nodeRef.search.get_rw().b.reset(i);
          j++;
        }
      }
543
      PersistEmulation::writeBytes(j * (sizeof(KeyType) + sizeof(ValueType) + 1) + ((j*2+7)>>3)); /// j entries/hashes + j*2 bits*/
544

545
      /// setup the list of leaf nodes
546
547
548
549
550
551
552
553
      if (nodeRef.nextLeaf != nullptr) {
        sibRef.nextLeaf = nodeRef.nextLeaf;
        nodeRef.nextLeaf->prevLeaf = sibling;
      }
      nodeRef.nextLeaf = sibling;
      sibRef.prevLeaf = node;
      //PersistEmulation::writeBytes(16*2);

554
      /// set split information
555
556
557
558
559
560
      auto &splitRef = *splitInfo;
      splitRef.leftChild = node;
      splitRef.rightChild = sibling;
      splitRef.key = splitKey;
  }

561
562
563
564
565
566
567
568
569
570
571
572
  /**
   * Insert a (key, value) pair at the given position @c pos into the leaf node
   * @c node. The caller has to ensure that
   * - there is enough space to insert the element
   * - the key is inserted at the correct position according to the order of
   * keys
   *
   * @oaram node the leaf node where the element is to be inserted
   * @param pos the position in the leaf node (0 <= pos <= numKeys < M)
   * @param key the key of the element
   * @param val the actual value corresponding to the key
   */
573
  void insertInLeafNodeAtPosition(const pptr<LeafNode> &node, const unsigned int pos,
574
575
      const KeyType &key, const ValueType &val) {
    assert(pos < M);
576
    auto &nodeRef = *node;
577

578
579
580
    /// insert the new entry at the given position
    nodeRef.keys.get_rw()[pos] = key;
    nodeRef.values.get_rw()[pos] = val;
581
582
    //PersistEmulation::persistStall();

583
584
585
    /// set bit and hash
    nodeRef.search.get_rw().b.set(pos);
    nodeRef.search.get_rw().fp[pos] = fpHash(key);
586
587
588
    //PersistEmulation::persistStall();
    if(sizeof(LeafSearch) > 64) PersistEmulation::persistStall();
    //PersistEmulation::writeBytes(sizeof(KeyType) + sizeof(ValueType) + 2);
589
590
591
592
593
594
595
596
597
598
599
600
601
  }

  /**
   * Insert a (key, value) pair into the tree recursively by following the path
   * down to the leaf level starting at node @c node at depth @c depth.
   *
   * @param node the starting node for the insert
   * @param depth the current depth of the tree (0 == leaf level)
   * @param key the key of the element
   * @param val the actual value corresponding to the key
   * @param splitInfo information about the split
   * @return true if a split was performed
   */
602
603
  bool insertInBranchNode(BranchNode * const node, unsigned int d, const KeyType &key,
                          const ValueType &val, SplitInfo *splitInfo) {
604
605
    SplitInfo childSplitInfo;
    bool split = false, hasSplit = false;
606
    auto &nodeRef = *node;
607
608

    auto pos = lookupPositionInBranchNode(node, key);
609
610
611
    if (d == 1) {
      /// case #1: our children are leaf nodes
      auto child = nodeRef.children[pos].leaf;
612
      hasSplit = insertInLeafNode(child, key, val, &childSplitInfo);
613
    } else {
614
615
616
      /// case #2: our children are branch nodes
      auto child = nodeRef.children[pos].branch;
      hasSplit = insertInBranchNode(child, d - 1, key, val, &childSplitInfo);
617
618
    }
    if (hasSplit) {
619
620
621
622
      auto host = node;
      /// the child node was split, thus we have to add a new entry
      /// to our branch node
      if (nodeRef.numKeys == N) {
623
        splitBranchNode(node, childSplitInfo.key, splitInfo);
624
625
        const auto &splitRef = *splitInfo;
        host = (key < splitRef.key ? splitRef.leftChild : splitRef.rightChild).branch;
626
627
628
        split = true;
        pos = lookupPositionInBranchNode(host, key);
      }
629
630
631
632
633
634
635
636
      auto &hostRef = *host;
      if (pos < hostRef.numKeys) {
        /// if the child isn't inserted at the rightmost position
        /// then we have to make space for it
        hostRef.children[hostRef.numKeys + 1] = hostRef.children[hostRef.numKeys];
        for (auto i = hostRef.numKeys; i > pos; i--) {
          hostRef.children[i] = hostRef.children[i - 1];
          hostRef.keys[i] = hostRef.keys[i - 1];
637
638
        }
      }
639
640
641
642
643
      /// finally, add the new entry at the given position
      hostRef.keys[pos] = childSplitInfo.key;
      hostRef.children[pos] = childSplitInfo.leftChild;
      hostRef.children[pos + 1] = childSplitInfo.rightChild;
      hostRef.numKeys = hostRef.numKeys + 1;
644
645
646
647
648
    }
    return split;
  }

  /**
649
650
   * Split the given branch node @c node in the middle and move half of the keys/children to the new
   * sibling node.
651
652
653
654
655
   *
   * @param node the branch node to be split
   * @param splitKey the key on which the split of the child occured
   * @param splitInfo information about the split
   */
656
657
658
659
660
661
662
663
664
665
666
667
668
669
  void splitBranchNode(BranchNode * const node, const KeyType &splitKey, SplitInfo *splitInfo) {
    auto &nodeRef = *node;
    /// determine the split position
    auto middle = (N + 1) / 2;
    /// adjust the middle based on the key we have to insert
    if (splitKey > nodeRef.keys[middle]) middle++;

    /// move all entries behind this position to a new sibling node
    const auto sibling = newBranchNode();
    auto &sibRef = *sibling;
    sibRef.numKeys = nodeRef.numKeys - middle;
    for (auto i = 0u; i < sibRef.numKeys; i++) {
      sibRef.keys[i] = nodeRef.keys[middle + i];
      sibRef.children[i] = nodeRef.children[middle + i];
670
    }
671
672
    sibRef.children[sibRef.numKeys] = nodeRef.children[nodeRef.numKeys];
    nodeRef.numKeys = middle - 1;
673

674
675
676
677
    auto &splitRef = *splitInfo;
    splitRef.key = nodeRef.keys[middle - 1];
    splitRef.leftChild = node;
    splitRef.rightChild = sibling;
678
679
680
  }

  /**
681
682
683
   * Traverse the tree starting at the root until the leaf node is found that could contain the
   * given @key. Note, that always a leaf node is returned even if the key doesn't exist on this
   * node.
684
685
686
687
   *
   * @param key the key we are looking for
   * @return the leaf node that would store the key
   */
688
  pptr<LeafNode> findLeafNode(const KeyType &key) const {
689
690
    auto node = rootNode;
    auto d = depth;
691
    while (d-- > 0) {
692
693
694
695
696
697
698
      auto n = node.branch;
      auto pos = lookupPositionInBranchNode(n, key);
      node = n->children[pos];
    }
    return node.leaf;
  }
  /**
699
   * Lookup the search key @c key in the given leaf node and return the position.
700
   * If the search key was not found, then @c M is returned.
701
702
703
   *
   * @param node the leaf node where we search
   * @param key the search key
704
   * @return the position of the key  (or @c M if not found)
705
   */
706
707
  auto lookupPositionInLeafNode(const pptr<LeafNode> &node, const KeyType &key) const {
    auto pos = 0u;
Philipp Götze's avatar
Philipp Götze committed
708
    const auto &nodeRef = *node;
709
    const auto hash = fpHash(key);
Philipp Götze's avatar
Philipp Götze committed
710
711
    const auto &keys = nodeRef.keys.get_ro();
    const auto &search = nodeRef.search.get_ro();
712
    for (; pos < M ; pos++) {
713
714
715
      if(search.fp[pos] == hash &&
         search.b.test(pos) &&
         keys[pos] == key)
716
717
        break;
    }
718
719
720
721
    return pos;
  }

  /**
722
723
724
   * Lookup the search key @c key in the given branch node and return the position which is the
   * position in the list of keys + 1. in this way, the position corresponds to the position of the
   * child pointer in the array @children.
725
726
727
   * If the search key is less than the smallest key, then @c 0 is returned.
   * If the key is greater than the largest key, then @c numKeys is returned.
   *
728
   * @param node the branch node where we search
729
730
731
   * @param key the search key
   * @return the position of the key + 1 (or 0 or @c numKey)
   */
732
  auto  lookupPositionInBranchNode(const BranchNode * const node, const KeyType &key) const {
733
    const auto &num = node->numKeys;
734
735
    //auto pos = 0u;
    //const auto &keys = nodeRef.keys;
736
737
    //for (; pos < num && keys[pos] <= key; pos++);
    //return pos;
738
    return binarySearch(node, 0, num - 1, key);
739
740
  }

741
  auto binarySearch(const BranchNode * const node, int l, int r, const KeyType &key) const {
742
    auto pos = 0u;
743
    const auto &keys = node->keys;
744
745
    while (l <= r) {
      pos = (l + r) / 2;
746
747
      if (keys[pos] == key) return ++pos;
      if (keys[pos] < key) l = ++pos;
748
749
      else r = pos - 1;
    }
750
751
752
753
754
755
756
757
758
759
    return pos;
  }

  /**
   * Delete the element with the given key from the given leaf node.
   *
   * @param node the leaf node from which the element is deleted
   * @param key the key of the element to be deleted
   * @return true of the element was deleted
   */
760
  bool eraseFromLeafNode(const pptr<LeafNode> &node, const KeyType &key) {
761
    auto pos = lookupPositionInLeafNode(node, key);
762
    if (pos < M) {
763
      node->search.get_rw().b.reset(pos);
764
      return true;
765
    }
766
    return false;
767
768
769
  }

  /**
770
771
   * Delete an entry from the tree by recursively going down to the leaf level and handling the
   * underflows.
772
773
774
775
776
777
   *
   * @param node the current branch node
   * @param d the current depth of the traversal
   * @param key the key to be deleted
   * @return true if the entry was deleted
   */
778
779
  bool eraseFromBranchNode(BranchNode * const node, unsigned int d, const KeyType &key) {
    auto &nodeRef = *node;
780
781
    assert(d >= 1);
    bool deleted = false;
782
    /// try to find the branch
783
    auto pos = lookupPositionInBranchNode(node, key);
784
    if (d == 1) {
785
786
      /// the next level is the leaf level
      auto leaf = nodeRef.children[pos].leaf;
787
788
      assert(leaf != nullptr);
      deleted = eraseFromLeafNode(leaf, key);
789
      constexpr auto middle = (M + 1) / 2;
790
      if (leaf->search.get_ro().b.count() < middle) {
791
        /// handle underflow
792
        underflowAtLeafLevel(node, pos, leaf);
793
794
      }
    } else {
795
      auto child = nodeRef.children[pos].branch;
796
797
798
      deleted = eraseFromBranchNode(child, d - 1, key);

      pos = lookupPositionInBranchNode(node, key);
799
      constexpr auto middle = (N + 1) / 2;
800
      if (child->numKeys < middle) {
801
        /// handle underflow
802
        child = underflowAtBranchLevel(node, pos, child);
803
804
        if (d == depth && nodeRef.numKeys == 0) {
          /// special case: the root node is empty now
805
          rootNode = child;
806
          --depth;
807
808
809
810
811
812
813
        }
      }
    }
    return deleted;
  }

  /**
814
815
   * Handle the case that during a delete operation a underflow at node @c leaf occured.
   * If possible this is handled
816
817
818
819
   * (1) by rebalancing the elements among the leaf node and one of its siblings
   * (2) if not possible by merging with one of its siblings.
   *
   * @param node the parent node of the node where the underflow occured
820
   * @param pos the position of the child node @leaf in the @c children array of the branch node
821
822
   * @param leaf the node at which the underflow occured
   */
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
  void underflowAtLeafLevel(BranchNode * const node, unsigned int pos, const pptr<LeafNode> &leaf) {
    auto &nodeRef = *node;
    auto &leafRef = *leaf;
    assert(pos <= nodeRef.numKeys);
    constexpr auto middle = (M + 1) / 2;
    /// 1. we check whether we can rebalance with one of the siblings but only if both nodes have
    ///    the same direct parent
    if (pos > 0 && leafRef.prevLeaf->search.get_ro().b.count() > middle) {
      /// we have a sibling at the left for rebalancing the keys
      balanceLeafNodes(leafRef.prevLeaf, leaf);

      nodeRef.keys[pos - 1] = leafRef.keys.get_ro()[findMinKeyAtLeafNode(leaf)];
    } else if (pos < nodeRef.numKeys && leafRef.nextLeaf->search.get_ro().b.count() > middle) {
      /// we have a sibling at the right for rebalancing the keys
      balanceLeafNodes(leafRef.nextLeaf, leaf);

      nodeRef.keys[pos] = leafRef.nextLeaf->keys.get_ro()[findMinKeyAtLeafNode(leafRef.nextLeaf)];
840
    } else {
841
842
843
844
845
      /// 2. if this fails we have to merge two leaf nodes but only if both nodes have the same
      ///    direct parent
      pptr<LeafNode> survivor = nullptr;
      if (pos > 0 && leafRef.prevLeaf->search.get_ro().b.count() <= middle) {
        survivor = mergeLeafNodes(leafRef.prevLeaf, leaf);
846
        deleteLeafNode(leaf);
847
848
849
      } else if (pos < nodeRef.numKeys && leafRef.nextLeaf->search.get_ro().b.count() <= middle) {
        /// because we update the pointers in mergeLeafNodes we keep it here
        auto l = leafRef.nextLeaf;
850
851
        survivor = mergeLeafNodes(leaf, l);
        deleteLeafNode(l);
852
      } else {
853
        /// this shouldn't happen?!
854
855
        assert(false);
      }
856
      if (nodeRef.numKeys > 1) {
857
        if (pos > 0) pos--;
858
859
860
861
        /// just remove the child node from the current branch node
        for (auto i = pos; i < nodeRef.numKeys - 1; i++) {
          nodeRef.keys[i] = nodeRef.keys[i + 1];
          nodeRef.children[i + 1] = nodeRef.children[i + 2];
862
        }
863
864
        nodeRef.children[pos] = survivor;
        --nodeRef.numKeys;
865
      } else {
866
867
        /// This is a special case that happens only if the current node is the root node.
        /// Now, we have to replace the branch root node by a leaf node.
868
869
        rootNode = survivor;
        --depth;
870
871
      }
    }
872
  }
873
874

  /**
875
876
877
   * Handle the case that during a delete operation a underflow at node @c child occured where @c
   * node is the parent node. If possible this is handled
   * (1) by rebalancing the elements among the node @c child and one of its siblings
878
879
880
   * (2) if not possible by merging with one of its siblings.
   *
   * @param node the parent node of the node where the underflow occured
881
   * @param pos the position of the child node @child in the @c children array of the branch node
882
883
884
   * @param child the node at which the underflow occured
   * @return the (possibly new) child node (in case of a merge)
   */
885
886
  BranchNode* underflowAtBranchLevel(BranchNode * const node, unsigned int pos,
                                     BranchNode * const child) {
887
888
    assert(node != nullptr);
    assert(child != nullptr);
889
890
    auto &nodeRef = *node;
    auto &childRef = *child;
891

892
893
894
    auto newChild = child;
    constexpr auto middle = (N + 1) / 2;
    /// 1. we check whether we can rebalance with one of the siblings
895
896

    if (pos > 0 &&
897
898
899
        nodeRef.children[pos - 1].branch->numKeys >middle) {
      /// we have a sibling at the left for rebalancing the keys
      auto sibling = nodeRef.children[pos - 1].branch;
900
      balanceBranchNodes(sibling, child, node, pos - 1);
901
      /// nodeRef.keys.get_rw()[pos] = child->keys.get_ro()[0];
902
      return newChild;
903
904
905
    } else if (pos < nodeRef.numKeys && nodeRef.children[pos + 1].branch->numKeys > middle) {
      /// we have a sibling at the right for rebalancing the keys
      auto sibling = nodeRef.children[pos + 1].branch;
906
907
908
      balanceBranchNodes(sibling, child, node, pos);
      return newChild;
    } else {
909
      /// 2. if this fails we have to merge two branch nodes
910
911
912
      BranchNode *lSibling = nullptr, *rSibling = nullptr;
      unsigned int prevKeys = 0, nextKeys = 0;
      if (pos > 0) {
913
        lSibling = nodeRef.children[pos - 1].branch;
914
915
        prevKeys = lSibling->numKeys;
      }
916
917
      if (pos < nodeRef.numKeys) {
        rSibling = nodeRef.children[pos + 1].branch;
918
919
920
921
922
        nextKeys = rSibling->numKeys;
      }
      BranchNode *witnessNode = nullptr;
      auto ppos = pos;
      if (prevKeys > 0) {
923
        mergeBranchNodes(lSibling, nodeRef.keys[pos - 1], child);
924
925
926
        ppos = pos - 1;
        witnessNode = child;
        newChild = lSibling;
927
        /// pos -= 1;
928
      } else if (nextKeys > 0) {
929
        mergeBranchNodes(child, nodeRef.keys[pos], rSibling);
930
931
        witnessNode = rSibling;
      } else
932
        /// shouldn't happen
933
        assert(false);
934
935
936
      /// remove nodeRef.keys.get_ro()[pos] from node
      for (auto i = ppos; i < nodeRef.numKeys - 1; i++) {
        nodeRef.keys[i] = nodeRef.keys[i + 1];
937
938
      }
      if (pos == 0) pos++;
939
940
941
      for (auto i = pos; i < nodeRef.numKeys; i++) {
        if (i + 1 <= nodeRef.numKeys) {
          nodeRef.children[i] = nodeRef.children[i + 1];
942
943
        }
      }
944
      nodeRef.numKeys--;
945
946
947
948
949
950
      deleteBranchNode(witnessNode);
      return newChild;
    }
  }

  /**
951
952
953
   * Redistribute (key, value) pairs from the leaf node @c donor to the leaf node @c receiver such
   * that both nodes have approx. the same number of elements. This method is used in case of an
   * underflow situation of a leaf node.
954
955
956
957
   *
   * @param donor the leaf node from which the elements are taken
   * @param receiver the sibling leaf node getting the elements from @c donor
   */
958
959
960
961
962
  void balanceLeafNodes(const pptr<LeafNode> &donor, const pptr<LeafNode> &receiver) {
    auto &donorRef = *donor;
    auto &receiverRef = *receiver;
    const auto dNumKeys = donorRef.search.get_ro().b.count();
    const auto rNumKeys = receiverRef.search.get_ro().b.count();
963
964
965
    assert(dNumKeys > rNumKeys);
    unsigned int balancedNum = (dNumKeys + rNumKeys) / 2;
    unsigned int toMove = dNumKeys - balancedNum;
966
967
    if (toMove == 0) return;

968
969
    if (donorRef.keys.get_ro()[0] < receiverRef.keys.get_ro()[0]) {
      /// move to a node with larger keys
970
971
      for (auto i = 0u; i < toMove; i++) {
        const auto max = findMaxKeyAtLeafNode(donor);
972
        const auto pos = receiverRef.search.get_ro().getFreeZero();
973

974
975
976
977
978
        receiverRef.search.get_rw().b.set(pos);
        receiverRef.search.get_rw().fp[pos] = fpHash(donorRef.keys.get_ro()[max]);
        receiverRef.keys.get_rw()[pos] = donorRef.keys.get_ro()[max];
        receiverRef.values.get_rw()[pos] = donorRef.values.get_ro()[max];
        donorRef.search.get_rw().b.reset(max);
979
980
      }
    } else {
981
      /// move to a node with smaller keys
982
983
      for (auto i = 0u; i < toMove; i++) {
        const auto min = findMinKeyAtLeafNode(donor);
984
        const auto pos = receiverRef.search.get_ro().getFreeZero();
985

986
987
988
989
990
        receiverRef.search.get_rw().b.set(pos);
        receiverRef.search.get_rw().fp[pos] = fpHash(donorRef.keys.get_ro()[min]);
        receiverRef.keys.get_rw()[pos] = donorRef.keys.get_ro()[min];
        receiverRef.values.get_rw()[pos] = donorRef.values.get_ro()[min];
        donorRef.search.get_rw().b.reset(min);
991
      }
992
    }
993
  }
994

995
  /**
996
   * Find position of the minimum key in unsorted leaf.
997
   *
998
999
1000
   * @param node the leaf node to find the minimum key in
   * @return position of the minimum key
   */
For faster browsing, not all history is shown. View entire blame