FPTree.hpp 46.3 KB
Newer Older
1
/*
2
 * Copyright (C) 2017-2020 DBIS Group - TU Ilmenau, All Rights Reserved.
3
 *
4
 * This file is part of our NVM-based Data Structures repository.
5
 *
6
7
8
 * This program is free software: you can redistribute it and/or modify it under the terms of the
 * GNU General Public License as published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
9
 *
10
11
12
 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 * See the GNU General Public License for more details.
13
 *
14
15
 * You should have received a copy of the GNU General Public License along with this program.
 * If not, see <http://www.gnu.org/licenses/>.
16
17
18
19
20
21
 */

#ifndef DBIS_FPTree_hpp_
#define DBIS_FPTree_hpp_

#include <array>
22
#include <bitset>
23
#include <cmath>
24
25
#include <iostream>

26
#include <libpmemobj/ctl.h>
27
28
29
30
31
32
#include <libpmemobj++/make_persistent.hpp>
#include <libpmemobj++/p.hpp>
#include <libpmemobj++/persistent_ptr.hpp>
#include <libpmemobj++/transaction.hpp>
#include <libpmemobj++/utils.hpp>

33
#include "config.h"
34
#include "utils/BitOperations.hpp"
35
#include "utils/PersistEmulation.hpp"
36
#include "utils/SearchFunctions.hpp"
37

38
namespace dbis::pbptrees {
39

40
using pmem::obj::allocation_flag;
41
42
43
44
45
using pmem::obj::delete_persistent;
using pmem::obj::make_persistent;
using pmem::obj::p;
using pmem::obj::persistent_ptr;
using pmem::obj::transaction;
46
47
template <typename Object>
using pptr = persistent_ptr<Object>;
48
49
50
51
52
53
54
55
56
57
58

/**
 * A persistent memory implementation of a FPTree.
 *
 * @tparam KeyType the data type of the key
 * @tparam ValueType the data type of the values associated with the key
 * @tparam N the maximum number of keys on a branch node
 * @tparam M the maximum number of keys on a leaf node
 */
template<typename KeyType, typename ValueType, int N, int M>
class FPTree {
59
  /// we need at least three keys on a branch node to be able to split
60
  static_assert(N > 2, "number of branch keys has to be >2.");
61
62
  /// we need an even order for branch nodes to be able to merge
  static_assert(N % 2 == 0, "order of branch nodes must be even.");
63
  /// we need at least one key on a leaf node
64
65
66
67
68
69
70
71
  static_assert(M > 0, "number of leaf keys should be >0.");

#ifndef UNIT_TESTS
  private:
#else
  public:
#endif

72
  /// Forward declarations
73
74
75
  struct LeafNode;
  struct BranchNode;

76
77
  struct Node {
    Node() : tag(BLANK) {};
78

79
    Node(const pptr<LeafNode> &leaf_) : tag(LEAF), leaf(leaf_) {};
80

81
    Node(BranchNode *branch_) : tag(BRANCH), branch(branch_) {};
82

83
    Node(const Node &other) { copy(other); };
84

85
    void copy(const Node &other) throw() {
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
      tag = other.tag;

      switch (tag) {
        case LEAF: {
          leaf = other.leaf;
          break;
        }
        case BRANCH: {
          branch = other.branch;
          break;
        }
        default: break;
      }
    }

101
    Node &operator=(Node other) {
102
103
104
105
106
      copy(other);
      return *this;
    }

    enum NodeType {
107
      BLANK, LEAF, BRANCH
108
109
    } tag;
    union {
110
      pptr<LeafNode> leaf;
111
112
113
114
115
116
117
      BranchNode *branch;
    };
  };

  /**
   * A structure for representing a leaf node of a B+ tree.
   */
118
119
120
121
122
  struct alignas(64) LeafNode {

    static constexpr auto NUM_KEYS = M;
    using KEY_TYPE = KeyType;

123
124
125
    /**
     * Constructor for creating a new empty leaf node.
     */
126
127
    LeafNode() : nextLeaf(nullptr), prevLeaf(nullptr) {}

128
129
130
131
132
133
134
135
136
137
138
139
140
    static constexpr auto BitsetSize = ((M + 63) / 64) * 8;  ///< number * size of words
    static constexpr auto HashSize = ((M + 7) / 8) * 8;  ///< round to 8 Byte
    static constexpr auto SearchSize = BitsetSize + HashSize + 32;
    static constexpr auto PaddingSize = (64 - SearchSize % 64) % 64;

    // p<LeafSearch> search;               ///< helper structure for faster searches
    p<std::bitset<M>> bits;              ///< bitset for valid entries
    p<std::array<uint8_t, M>> fp;        ///< fingerprint array (n & 0xFF)
    pptr<LeafNode> nextLeaf;             ///< pointer to the subsequent sibling
    pptr<LeafNode> prevLeaf;             ///< pointer to the preceeding sibling
    char padding[PaddingSize];           ///< padding to align keys to 64 bytes
    p<std::array<KeyType, M>> keys;      ///< the actual keys
    p<std::array<ValueType, M>> values;  ///< the actual values
141
142
143
144
145
  };

  /**
   * A structure for representing an branch node (branch node) of a B+ tree.
   */
146
147
148
149
  struct alignas(64) BranchNode {

    static constexpr auto NUM_KEYS = N;

150
151
152
153
154
    /**
     * Constructor for creating a new empty branch node.
     */
    BranchNode() : numKeys(0) {}

155
156
157
    unsigned int numKeys;             ///< the number of currently stored keys
    std::array<KeyType, N> keys;      ///< the actual keys
    std::array<Node, N + 1> children; ///< pointers to child nodes (BranchNode or LeafNode)
158
159
160
161
162
  };

  /**
   * Create a new empty leaf node
   */
163
  pptr<LeafNode> newLeafNode() {
164
    auto pop = pmem::obj::pool_by_vptr(this);
165
    pptr<LeafNode> newNode = nullptr;
166
167
168
    transaction::run(pop, [&] {
      newNode = make_persistent<LeafNode>(allocation_flag::class_id(alloc_class.class_id));
    });
169
170
171
    return newNode;
  }

172
  pptr<LeafNode> newLeafNode(const pptr<LeafNode> &other) {
173
    auto pop = pmem::obj::pool_by_vptr(this);
174
    pptr<LeafNode> newNode = nullptr;
175
176
177
    transaction::run(pop, [&] {
      newNode = make_persistent<LeafNode>(allocation_flag::class_id(alloc_class.class_id), *other);
    });
178
179
180
    return newNode;
  }

181
  void deleteLeafNode(const pptr<LeafNode> &node) {
182
    auto pop = pmem::obj::pool_by_vptr(this);
183
    transaction::run(pop, [&] { delete_persistent<LeafNode>(node); });
184
185
186
187
188
189
  }

  /**
   * Create a new empty branch node
   */
  BranchNode *newBranchNode() {
190
    return new BranchNode();
191
192
  }

193
  void deleteBranchNode(const BranchNode * const node) {
194
    delete node;
195
196
197
  }

  /**
198
   * A structure for passing information about a node split to the caller.
199
200
   */
  struct SplitInfo {
201
202
203
    KeyType key;     ///< the key at which the node was split
    Node leftChild;  ///< the resulting lhs child node
    Node rightChild; ///< the resulting rhs child node
204
205
  };

206
207
208
209
210
211
  static constexpr pobj_alloc_class_desc AllocClass{256, 64, 1, POBJ_HEADER_COMPACT};
  pobj_alloc_class_desc alloc_class;
  unsigned int depth;       /**< the depth of the tree, i.e. the number of levels
                                 (0 => rootNode is LeafNode) */
  Node rootNode;            ///< pointer to the root node
  pptr<LeafNode> leafList;  ///< pointer to the leaf at the most left position (for recovery)
212

213
214
215
  /* -------------------------------------------------------------------------------------------- */

 public:
216
217
218
219
  /**
   * Iterator for iterating over the leaf nodes
   */
  class iterator {
220
221
    pptr<LeafNode> currentNode;
    unsigned int currentPosition;
222
223
224

    public:
    iterator() : currentNode(nullptr), currentPosition(0) {}
225
226
    iterator(const Node &root, unsigned int d) {
      /// traverse to left-most key
227
      auto node = root;
228
      while (d-- > 0) node = node.branch->children[0];
229
230
      currentNode = node.leaf;
      currentPosition = 0;
231
      const auto &nodeBits = currentNode->bits.get_ro();
232
      /// Can not overflow as there are at least M/2 entries
233
      while(!nodeBits.test(currentPosition)) ++currentPosition;
234
235
236
    }

    iterator& operator++() {
237
      if (currentPosition >= M-1) {
238
239
        currentNode = currentNode->nextLeaf;
        currentPosition = 0;
240
        if (currentNode == nullptr) return *this;
241
        const auto &nodeBits = currentNode->bits.get_ro();
242
        while(!nodeBits.test(currentPosition)) ++currentPosition;
243
      } else if (!currentNode->bits.get_ro().test(++currentPosition)) ++(*this);
244
245
246
      return *this;
    }

247
248
249
250
251
252
253
254
255
256
257
    iterator operator++(int) {
      iterator retval = *this;
      ++(*this);
      return retval;
    }

    bool operator==(iterator other) const {
      return (currentNode == other.currentNode && currentPosition == other.currentPosition);
    }

    bool operator!=(iterator other) const { return !(*this == other); }
258
259

    std::pair<KeyType, ValueType> operator*() {
260
261
262
      const auto nodeRef = *currentNode;
      return std::make_pair(nodeRef.keys.get_ro()[currentPosition],
                            nodeRef.values.get_ro()[currentPosition]);
263
264
    }

265
    /// iterator traits
266
267
268
269
270
271
272
273
    using difference_type = long;
    using value_type = std::pair<KeyType, ValueType>;
    using pointer = const std::pair<KeyType, ValueType>*;
    using reference = const std::pair<KeyType, ValueType>&;
    using iterator_category = std::forward_iterator_tag;
  };
  iterator begin() { return iterator(rootNode, depth); }
  iterator end() { return iterator(); }
274
275
276
277
278
279
280
281

  /* -------------------------------------------------------------------------------------------- */

  /**
   * Typedef for a function passed to the scan method.
   */
  using ScanFunc = std::function<void(const KeyType &key, const ValueType &val)>;

282
283
284
  /**
   * Constructor for creating a new  tree.
   */
285
286
  explicit FPTree(struct pobj_alloc_class_desc _alloc) : depth(0), alloc_class(_alloc) {
  // FPTree() {
287
    rootNode = newLeafNode();
288
289
    leafList = rootNode.leaf;
    depth = 0;
290
291
    LOG("created new FPTree with sizeof(BranchNode) = "
        << sizeof(BranchNode) << ", sizeof(LeafNode) = " << sizeof(LeafNode));
292
293
294
295
296
  }

  /**
   * Destructor for the tree. Should delete all allocated nodes.
   */
297
  ~FPTree() {}
298
299

  /**
300
301
   * Insert an element (a key-value pair) into the tree. If the key @c key already exists, the
   * corresponding value is replaced by @c val.
302
303
304
305
306
   *
   * @param key the key of the element to be inserted
   * @param val the value that is associated with the key
   */
  void insert(const KeyType &key, const ValueType &val) {
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
    SplitInfo splitInfo;

    bool wasSplit = false;
    if (depth == 0) {
      /// the root node is a leaf node
      auto n = rootNode.leaf;
      wasSplit = insertInLeafNode(n, key, val, &splitInfo);
    } else {
      /// the root node is a branch node
      auto n = rootNode.branch;
      wasSplit = insertInBranchNode(n, depth, key, val, &splitInfo);
    }
    if (wasSplit) {
      /// we had an overflow in the node and therefore the node is split
      const auto root = newBranchNode();
      auto &rootRef = *root;
      rootRef.keys[0] = splitInfo.key;
      rootRef.children[0] = splitInfo.leftChild;
      rootRef.children[1] = splitInfo.rightChild;
      ++rootRef.numKeys;
      rootNode.branch = root;
      ++depth;
    }
330
331
332
  }

  /**
333
   * Find the given @c key in the  tree and if found return the corresponding value.
334
335
   *
   * @param key the key we are looking for
336
   * @param[out] val a pointer to memory where the value is stored if the key was found
337
338
339
340
   * @return true if the key was found, false otherwise
   */
  bool lookup(const KeyType &key, ValueType *val)  {
    assert(val != nullptr);
341
342
    const auto leaf = findLeafNode(key);
    const auto pos = lookupPositionInLeafNode(leaf, key);
343
    if (pos < M) {
344
345
346
      /// we found it!
      *val = leaf->values.get_ro()[pos];
      return true;
347
    }
348
    return false;
349
350
351
352
353
354
355
356
357
358
  }

  /**
   * Delete the entry with the given key @c key from the tree.
   *
   * @param key the key of the entry to be deleted
   * @return true if the key was found and deleted
   */
  bool erase(const KeyType &key) {
    bool result;
359
360
361
362
363
364
365
366
367
368
    if (depth == 0) {
      /// special case: the root node is a leaf node and there is no need to handle underflow
      auto node = rootNode.leaf;
      assert(node != nullptr);
      result=eraseFromLeafNode(node, key);
    } else {
      auto node = rootNode.branch;
      assert(node != nullptr);
      result=eraseFromBranchNode(node, depth, key);
    }
369
370
    return result;
  }
371

372
373
374
  /**
   * Recover the FPTree by iterating over the LeafList and using the recoveryInsert method.
   */
375
  void recover() {
376
    LOG("Starting RECOVERY of FPTree");
377
    pptr<LeafNode> currentLeaf = leafList;
378
    if (leafList == nullptr) {
379
      LOG("No data to recover FPTree");
380
      return;
381
    }
382
383
384
385
386
387
    /* counting leafs */
    auto leafs = 0u;
    while(currentLeaf != nullptr) {
      ++leafs;
      currentLeaf = currentLeaf->nextLeaf;
    }
388
389
    // float x = std::log(leafs)/std::log(N+1);
    // assert(x == int(x) && "Not supported for this amount of leafs, yet");
390
391
392
393

    /* actual recovery */
    currentLeaf = leafList;
    if (leafList->nextLeaf == nullptr) {
394
      /// The index has only one node, so the leaf node becomes the root node
395
396
      rootNode = leafList;
      depth = 0;
397
    } else {
398
      rootNode = newBranchNode();
399
      depth = 1;
400
401
      rootNode.branch->children[0] = currentLeaf;
      currentLeaf = currentLeaf->nextLeaf;
402
403
404
405
406
      while (currentLeaf != nullptr) {
        recoveryInsert(currentLeaf);
        currentLeaf = currentLeaf->nextLeaf;
      }
    }
407
    LOG("RECOVERY Done")
408
409
410
411
412
413
  }

  /**
   * Print the structure and content of the tree to stdout.
   */
  void print() const {
414
415
    if (depth == 0) printLeafNode(0, rootNode.leaf);
    else printBranchNode(0u, rootNode.branch);
416
417
418
419
420
421
422
423
  }

  /**
   * Perform a scan over all key-value pairs stored in the tree.
   * For each entry the given function @func is called.
   *
   * @param func the function called for each entry
   */
424
  void scan(ScanFunc func) const {
425
    /// we traverse to the leftmost leaf node
426
427
    auto node = rootNode;
    auto d = depth;
428
    while ( d-- > 0) node = node.branch->children[0];
429
430
    auto leaf = node.leaf;
    while (leaf != nullptr) {
431
      auto &leafRef = *leaf;
432
      /// for each key-value pair call func
433
      const auto &leafBits = leafRef.bits.get_ro();
434
435
436
437
438
439
      const auto &leafKeys = leafRef.keys.get_ro();
      const auto &leafValues = leafRef.values.get_ro();
      for (auto i = 0u; i < M; i++) {
        if (!leafBits.test(i)) continue;
        const auto &key = leafKeys[i];
        const auto &val = leafValues[i];
440
441
        func(key, val);
      }
442
443
      /// move to the next leaf node
      leaf = leafRef.nextLeaf;
444
    }
445
  }
446
447

  /**
448
449
   * Perform a range scan over all elements within the range [minKey, maxKey] and for each element
   * call the given function @c func.
450
451
452
453
454
   *
   * @param minKey the lower boundary of the range
   * @param maxKey the upper boundary of the range
   * @param func the function called for each entry
   */
455
  void scan(const KeyType &minKey, const KeyType &maxKey, ScanFunc func) const {
456
457
458
459
    auto leaf = findLeafNode(minKey);

    bool higherThanMax = false;
    while (!higherThanMax && leaf != nullptr) {
460
461
      /// for each key-value pair within the range call func
      const auto leafRef = *leaf;
462
      const auto &leafBits = leafRef.bits.get_ro();
463
464
      const auto &leafKeys = leafRef.keys.get_ro();
      const auto &leafValues = leafRef.values.get_ro();
465
      for (auto i = 0u; i < M; i++) {
466
467
        if (!leafBits.test(i)) continue;
        auto &key = leafKeys[i];
468
469
470
        if (key < minKey) continue;
        if (key > maxKey) { higherThanMax = true; continue; }

471
        auto &val = leafValues[i];
472
473
        func(key, val);
      }
474
475
      /// move to the next leaf node
      leaf = leafRef.nextLeaf;
476
    }
477
  }
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492

#ifndef UNIT_TESTS
  private:
#endif

  /**
   * Insert a (key, value) pair into the corresponding leaf node. It is the
   * responsibility of the caller to make sure that the node @c node is
   * the correct node. The key is inserted at the correct position.
   *
   * @param node the node where the key-value pair is inserted.
   * @param key the key to be inserted
   * @param val the value associated with the key
   * @param splitInfo information about a possible split of the node
   */
493
  bool insertInLeafNode(const pptr<LeafNode> &node, const KeyType &key,
494
      const ValueType &val, SplitInfo *splitInfo) {
495
    auto &nodeRef = *node;
496
497
498
    bool split = false;
    auto pos = lookupPositionInLeafNode(node, key);

499
    if (pos < M) {
500
      /// handle insert of duplicates
501
      nodeRef.values.get_rw()[pos] = val;
502
      return false;
503
    }
504
    pos = BitOperations::getFreeZero(nodeRef.bits.get_ro());
505
    if (pos == M) {
506
507
508
509
510
511
512
      /* split the node */
      splitLeafNode(node, splitInfo);
      auto &splitRef = *splitInfo;
      auto sibling= splitRef.rightChild.leaf;
      auto &sibRef= *sibling;

      /* insert the new entry */
513
      if (key > splitRef.key) {
514
        insertInLeafNodeAtPosition(sibling, BitOperations::getFreeZero(sibRef.bits.get_ro()), key, val);
515
      } else {
516
        if (key > nodeRef.keys.get_ro()[findMaxKeyPos(nodeRef.keys.get_ro(), nodeRef.bits.get_ro())]) {
517
          /// Special case: new key would be the middle, thus must be right
518
          insertInLeafNodeAtPosition(sibling, BitOperations::getFreeZero(sibRef.bits.get_ro()), key, val);
519
520
          splitRef.key = key;
        } else {
521
          insertInLeafNodeAtPosition(node, BitOperations::getFreeZero(nodeRef.bits.get_ro()), key, val);
522
523
        }
      }
524
      /* inform the caller about the split */
525
526
      split = true;
    } else {
527
      /* otherwise, we can simply insert the new entry at the given position */
528
529
530
531
532
      insertInLeafNodeAtPosition(node, pos, key, val);
    }
    return split;
  }

533
534
535
536
537
538
539
  /**
   * Split the given leaf node @c node in the middle and move half of the keys/children to the new
   * sibling node.
   *
   * @param node the leaf node to be split
   * @param splitInfo[out] information about the split
   */
540
  void splitLeafNode(const pptr<LeafNode> &node, SplitInfo *splitInfo) {
541
542
543
    auto &nodeRef = *node;

    /* determine the split position by finding median in unsorted array of keys*/
544
545
    const auto [bitmap, splitPos] = findSplitKey<KeyType, M>(nodeRef.keys.get_ro().data());
    const auto &splitKey = nodeRef.keys.get_ro()[splitPos];
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576

    /// copy leaf
    /*
    const auto sibling = newLeafNode(node);
    auto &sibRef = *sibling;
    nodeRef.bits.get_rw() = bitmap;
    sibRef.bits.get_rw() = bitmap.flip();
    PersistEmulation::writeBytes<sizeof(LeafNode) + ((2*M+7)>>3)>(); /// copy leaf + 2 bitmaps
    */

    /// Alternative: move instead of complete copy
    ///*
    const auto sibling = newLeafNode();
    auto &sibRef = *sibling;
    auto &sibBits = sibRef.bits.get_rw();
    auto &sibHashs = sibRef.fp.get_rw();
    auto &sibKeys = sibRef.keys.get_rw();
    auto &sibValues = sibRef.values.get_rw();
    auto &nodeBits = nodeRef.bits.get_rw();
    const auto &nodeHashs = nodeRef.fp.get_ro();
    const auto &nodeKeys = nodeRef.keys.get_ro();
    const auto &nodeValues = nodeRef.values.get_ro();
    auto j = 0u;
    for (auto i = 0u; i < M; i++) {
      if (nodeKeys[i] >= splitKey) {
        sibKeys[j] = nodeKeys[i];
        sibValues[j] = nodeValues[i];
        sibHashs[j] = nodeHashs[i];
        sibBits.set(j);
        nodeBits.reset(i);
        j++;
577
      }
578
579
580
581
582
583
584
585
586
587
588
589
590
591
    }
    PersistEmulation::writeBytes(j * (sizeof(KeyType) + sizeof(ValueType) + 1) +
                                 ((j * 2 + 7) >> 3));  /// j entries/hashes + j*2 bits
    // */

    /// setup the list of leaf nodes
    if (nodeRef.nextLeaf != nullptr) {
      sibRef.nextLeaf = nodeRef.nextLeaf;
      nodeRef.nextLeaf->prevLeaf = sibling;
      PersistEmulation::writeBytes<16 * 2>();
    }
    nodeRef.nextLeaf = sibling;
    sibRef.prevLeaf = node;
    PersistEmulation::writeBytes<16 * 2>();
592

593
594
595
596
597
    /// set split information
    auto &splitRef = *splitInfo;
    splitRef.leftChild = node;
    splitRef.rightChild = sibling;
    splitRef.key = splitKey;
598
599
  }

600
601
602
603
604
605
606
607
608
609
610
611
  /**
   * Insert a (key, value) pair at the given position @c pos into the leaf node
   * @c node. The caller has to ensure that
   * - there is enough space to insert the element
   * - the key is inserted at the correct position according to the order of
   * keys
   *
   * @oaram node the leaf node where the element is to be inserted
   * @param pos the position in the leaf node (0 <= pos <= numKeys < M)
   * @param key the key of the element
   * @param val the actual value corresponding to the key
   */
612
  void insertInLeafNodeAtPosition(const pptr<LeafNode> &node, const unsigned int pos,
613
                                  const KeyType &key, const ValueType &val) {
614
    assert(pos < M);
615
    auto &nodeRef = *node;
616

617
618
619
    /// insert the new entry at the given position
    nodeRef.keys.get_rw()[pos] = key;
    nodeRef.values.get_rw()[pos] = val;
620

621
    /// set bit and hash
622
623
    nodeRef.bits.get_rw().set(pos);
    nodeRef.fp.get_rw()[pos] = fpHash(key);
624
    PersistEmulation::writeBytes<sizeof(KeyType) + sizeof(ValueType) + 2>();
625
626
627
628
629
630
631
  }

  /**
   * Insert a (key, value) pair into the tree recursively by following the path
   * down to the leaf level starting at node @c node at depth @c depth.
   *
   * @param node the starting node for the insert
632
   * @param d the current depth of the tree (0 == leaf level)
633
634
635
636
637
   * @param key the key of the element
   * @param val the actual value corresponding to the key
   * @param splitInfo information about the split
   * @return true if a split was performed
   */
638
639
  bool insertInBranchNode(BranchNode * const node, unsigned int d, const KeyType &key,
                          const ValueType &val, SplitInfo *splitInfo) {
640
641
    SplitInfo childSplitInfo;
    bool split = false, hasSplit = false;
642
    auto &nodeRef = *node;
643
644

    auto pos = lookupPositionInBranchNode(node, key);
645
646
647
    if (d == 1) {
      /// case #1: our children are leaf nodes
      auto child = nodeRef.children[pos].leaf;
648
      hasSplit = insertInLeafNode(child, key, val, &childSplitInfo);
649
    } else {
650
651
652
      /// case #2: our children are branch nodes
      auto child = nodeRef.children[pos].branch;
      hasSplit = insertInBranchNode(child, d - 1, key, val, &childSplitInfo);
653
654
    }
    if (hasSplit) {
655
656
657
658
      auto host = node;
      /// the child node was split, thus we have to add a new entry
      /// to our branch node
      if (nodeRef.numKeys == N) {
659
        splitBranchNode(node, childSplitInfo.key, splitInfo);
660
661
        const auto &splitRef = *splitInfo;
        host = (key < splitRef.key ? splitRef.leftChild : splitRef.rightChild).branch;
662
663
664
        split = true;
        pos = lookupPositionInBranchNode(host, key);
      }
665
666
667
668
669
670
671
672
      auto &hostRef = *host;
      if (pos < hostRef.numKeys) {
        /// if the child isn't inserted at the rightmost position
        /// then we have to make space for it
        hostRef.children[hostRef.numKeys + 1] = hostRef.children[hostRef.numKeys];
        for (auto i = hostRef.numKeys; i > pos; i--) {
          hostRef.children[i] = hostRef.children[i - 1];
          hostRef.keys[i] = hostRef.keys[i - 1];
673
674
        }
      }
675
676
677
678
679
      /// finally, add the new entry at the given position
      hostRef.keys[pos] = childSplitInfo.key;
      hostRef.children[pos] = childSplitInfo.leftChild;
      hostRef.children[pos + 1] = childSplitInfo.rightChild;
      hostRef.numKeys = hostRef.numKeys + 1;
680
681
682
683
684
    }
    return split;
  }

  /**
685
686
   * Split the given branch node @c node in the middle and move half of the keys/children to the new
   * sibling node.
687
688
689
690
691
   *
   * @param node the branch node to be split
   * @param splitKey the key on which the split of the child occured
   * @param splitInfo information about the split
   */
692
693
694
695
696
697
698
699
700
701
702
703
704
705
  void splitBranchNode(BranchNode * const node, const KeyType &splitKey, SplitInfo *splitInfo) {
    auto &nodeRef = *node;
    /// determine the split position
    auto middle = (N + 1) / 2;
    /// adjust the middle based on the key we have to insert
    if (splitKey > nodeRef.keys[middle]) middle++;

    /// move all entries behind this position to a new sibling node
    const auto sibling = newBranchNode();
    auto &sibRef = *sibling;
    sibRef.numKeys = nodeRef.numKeys - middle;
    for (auto i = 0u; i < sibRef.numKeys; i++) {
      sibRef.keys[i] = nodeRef.keys[middle + i];
      sibRef.children[i] = nodeRef.children[middle + i];
706
    }
707
708
    sibRef.children[sibRef.numKeys] = nodeRef.children[nodeRef.numKeys];
    nodeRef.numKeys = middle - 1;
709

710
711
712
713
    auto &splitRef = *splitInfo;
    splitRef.key = nodeRef.keys[middle - 1];
    splitRef.leftChild = node;
    splitRef.rightChild = sibling;
714
715
716
  }

  /**
717
718
719
   * Traverse the tree starting at the root until the leaf node is found that could contain the
   * given @key. Note, that always a leaf node is returned even if the key doesn't exist on this
   * node.
720
721
722
723
   *
   * @param key the key we are looking for
   * @return the leaf node that would store the key
   */
724
  pptr<LeafNode> findLeafNode(const KeyType &key) const {
725
726
    auto node = rootNode;
    auto d = depth;
727
    while (d-- > 0) {
728
729
730
731
732
733
734
      auto n = node.branch;
      auto pos = lookupPositionInBranchNode(n, key);
      node = n->children[pos];
    }
    return node.leaf;
  }
  /**
735
   * Lookup the search key @c key in the given leaf node and return the position.
736
   * If the search key was not found, then @c M is returned.
737
738
739
   *
   * @param node the leaf node where we search
   * @param key the search key
740
   * @return the position of the key  (or @c M if not found)
741
   */
742
743
  auto lookupPositionInLeafNode(const pptr<LeafNode> &node, const KeyType &key) const {
    auto pos = 0u;
Philipp Götze's avatar
Philipp Götze committed
744
    const auto &nodeRef = *node;
745
    const auto hash = fpHash(key);
Philipp Götze's avatar
Philipp Götze committed
746
    const auto &keys = nodeRef.keys.get_ro();
747
748
749
750
751
    const auto &bits = nodeRef.bits.get_ro();
    const auto &hashs = nodeRef.fp.get_ro();

    for (; pos < M; ++pos)
      if (hashs[pos] == hash && bits.test(pos) && keys[pos] == key) break;
752
753
754
755
    return pos;
  }

  /**
756
757
758
   * Lookup the search key @c key in the given branch node and return the position which is the
   * position in the list of keys + 1. in this way, the position corresponds to the position of the
   * child pointer in the array @children.
759
760
761
   * If the search key is less than the smallest key, then @c 0 is returned.
   * If the key is greater than the largest key, then @c numKeys is returned.
   *
762
   * @param node the branch node where we search
763
764
765
   * @param key the search key
   * @return the position of the key + 1 (or 0 or @c numKey)
   */
766
767
  auto lookupPositionInBranchNode(const BranchNode * const node, const KeyType &key) const {
    const auto num = node->numKeys;
768
    const auto &keys = node->keys;
769
770
771
    // auto pos = 0u;
    // for (; pos < num && keys[pos] <= key; ++pos);
    // return pos;
772
    return binarySearch<true>(keys, 0, num - 1, key);
773
774
775
776
777
778
779
780
781
  }

  /**
   * Delete the element with the given key from the given leaf node.
   *
   * @param node the leaf node from which the element is deleted
   * @param key the key of the element to be deleted
   * @return true of the element was deleted
   */
782
  bool eraseFromLeafNode(const pptr<LeafNode> &node, const KeyType &key) {
783
    auto pos = lookupPositionInLeafNode(node, key);
784
785
786
787
788
789
790
791
792
793
794
795
796
    return eraseFromLeafNodeAtPosition(node, pos, key);
  }

  /**
   * Delete the element with the given position and key from the given leaf node.
   *
   * @param node the leaf node from which the element is deleted
   * @param pos the position of the key in the node
   * @param key the key of the element to be deleted
   * @return true of the element was deleted
   */
  bool eraseFromLeafNodeAtPosition(const pptr<LeafNode> &node, const unsigned int pos,
                                   const KeyType &key) {
797
    if (pos < M) {
798
799
      node->bits.get_rw().reset(pos);
      PersistEmulation::writeBytes<1>();
800
      return true;
801
    }
802
    return false;
803
804
805
  }

  /**
806
807
   * Delete an entry from the tree by recursively going down to the leaf level and handling the
   * underflows.
808
809
810
811
812
813
   *
   * @param node the current branch node
   * @param d the current depth of the traversal
   * @param key the key to be deleted
   * @return true if the entry was deleted
   */
814
815
  bool eraseFromBranchNode(BranchNode * const node, unsigned int d, const KeyType &key) {
    auto &nodeRef = *node;
816
817
    assert(d >= 1);
    bool deleted = false;
818
    /// try to find the branch
819
    auto pos = lookupPositionInBranchNode(node, key);
820
    if (d == 1) {
821
822
      /// the next level is the leaf level
      auto leaf = nodeRef.children[pos].leaf;
823
824
      assert(leaf != nullptr);
      deleted = eraseFromLeafNode(leaf, key);
825
      constexpr auto middle = (M + 1) / 2;
826
      if (leaf->bits.get_ro().count() < middle) {
827
        /// handle underflow
828
        underflowAtLeafLevel(node, pos, leaf);
829
830
      }
    } else {
831
      auto child = nodeRef.children[pos].branch;
832
833
834
      deleted = eraseFromBranchNode(child, d - 1, key);

      pos = lookupPositionInBranchNode(node, key);
835
      constexpr auto middle = (N + 1) / 2;
836
      if (child->numKeys < middle) {
837
        /// handle underflow
838
        child = underflowAtBranchLevel(node, pos, child);
839
840
        if (d == depth && nodeRef.numKeys == 0) {
          /// special case: the root node is empty now
841
          rootNode = child;
842
          --depth;
843
844
845
846
847
848
849
        }
      }
    }
    return deleted;
  }

  /**
850
851
   * Handle the case that during a delete operation a underflow at node @c leaf occured.
   * If possible this is handled
852
853
854
855
   * (1) by rebalancing the elements among the leaf node and one of its siblings
   * (2) if not possible by merging with one of its siblings.
   *
   * @param node the parent node of the node where the underflow occured
856
   * @param pos the position of the child node @leaf in the @c children array of the branch node
857
858
   * @param leaf the node at which the underflow occured
   */
859
860
861
  void underflowAtLeafLevel(BranchNode * const node, unsigned int pos, const pptr<LeafNode> &leaf) {
    auto &nodeRef = *node;
    auto &leafRef = *leaf;
862
    auto prevNumKeys = 0u, nextNumKeys = 0u;
863
864
865
866
    assert(pos <= nodeRef.numKeys);
    constexpr auto middle = (M + 1) / 2;
    /// 1. we check whether we can rebalance with one of the siblings but only if both nodes have
    ///    the same direct parent
867
    if (pos > 0 && (prevNumKeys = leafRef.prevLeaf->bits.get_ro().count()) > middle) {
868
869
870
      /// we have a sibling at the left for rebalancing the keys
      balanceLeafNodes(leafRef.prevLeaf, leaf);

871
      nodeRef.keys[pos - 1] =
872
          leafRef.keys.get_ro()[findMinKeyPos(leafRef.keys.get_ro(), leafRef.bits.get_ro())];
873
    } else if (pos < nodeRef.numKeys && leafRef.nextLeaf->bits.get_ro().count() > middle) {
874
875
      /// we have a sibling at the right for rebalancing the keys
      balanceLeafNodes(leafRef.nextLeaf, leaf);
876
      auto &nextLeaf = *leafRef.nextLeaf;
877
      nodeRef.keys[pos] =
878
          nextLeaf.keys.get_ro()[findMinKeyPos(nextLeaf.keys.get_ro(), nextLeaf.bits.get_ro())];
879
    } else {
880
881
882
      /// 2. if this fails we have to merge two leaf nodes but only if both nodes have the same
      ///    direct parent
      pptr<LeafNode> survivor = nullptr;
883
      if (pos > 0 && leafRef.prevLeaf->bits.get_ro().count() <= middle) {
884
        survivor = mergeLeafNodes(leafRef.prevLeaf, leaf);
885
        deleteLeafNode(leaf);
886
      } else if (pos < nodeRef.numKeys && leafRef.nextLeaf->bits.get_ro().count() <= middle) {
887
888
        /// because we update the pointers in mergeLeafNodes we keep it here
        auto l = leafRef.nextLeaf;
889
890
        survivor = mergeLeafNodes(leaf, l);
        deleteLeafNode(l);
891
892
      } else assert(false); ///< this shouldn't happen?!

893
      if (nodeRef.numKeys > 1) {
894
        if (pos > 0) pos--;
895
896
897
898
        /// just remove the child node from the current branch node
        for (auto i = pos; i < nodeRef.numKeys - 1; i++) {
          nodeRef.keys[i] = nodeRef.keys[i + 1];
          nodeRef.children[i + 1] = nodeRef.children[i + 2];
899
        }
900
901
        nodeRef.children[pos] = survivor;
        --nodeRef.numKeys;
902
      } else {
903
904
        /// This is a special case that happens only if the current node is the root node.
        /// Now, we have to replace the branch root node by a leaf node.
905
906
        rootNode = survivor;
        --depth;
907
908
      }
    }
909
  }
910
911

  /**
912
913
914
   * Handle the case that during a delete operation a underflow at node @c child occured where @c
   * node is the parent node. If possible this is handled
   * (1) by rebalancing the elements among the node @c child and one of its siblings
915
916
917
   * (2) if not possible by merging with one of its siblings.
   *
   * @param node the parent node of the node where the underflow occured
918
   * @param pos the position of the child node @child in the @c children array of the branch node
919
920
921
   * @param child the node at which the underflow occured
   * @return the (possibly new) child node (in case of a merge)
   */
922
923
  BranchNode* underflowAtBranchLevel(BranchNode * const node, unsigned int pos,
                                     BranchNode * const child) {
924
925
    assert(node != nullptr);
    assert(child != nullptr);
926
927
928
929
    auto &nodeRef = *node;
    auto newChild = child;
    constexpr auto middle = (N + 1) / 2;
    /// 1. we check whether we can rebalance with one of the siblings
930
931

    if (pos > 0 &&
932
933
        nodeRef.children[pos - 1].branch->numKeys >middle) {
      /// we have a sibling at the left for rebalancing the keys
934
      auto &sibling = nodeRef.children[pos - 1].branch;
935
936
      balanceBranchNodes(sibling, child, node, pos - 1);
      return newChild;
937
938
    } else if (pos < nodeRef.numKeys && nodeRef.children[pos + 1].branch->numKeys > middle) {
      /// we have a sibling at the right for rebalancing the keys
939
      auto &sibling = nodeRef.children[pos + 1].branch;
940
941
942
      balanceBranchNodes(sibling, child, node, pos);
      return newChild;
    } else {
943
      /// 2. if this fails we have to merge two branch nodes
944
945
946
      BranchNode *lSibling = nullptr, *rSibling = nullptr;
      unsigned int prevKeys = 0, nextKeys = 0;
      if (pos > 0) {
947
        lSibling = nodeRef.children[pos - 1].branch;
948
949
        prevKeys = lSibling->numKeys;
      }
950
951
      if (pos < nodeRef.numKeys) {
        rSibling = nodeRef.children[pos + 1].branch;
952
953
954
955
956
        nextKeys = rSibling->numKeys;
      }
      BranchNode *witnessNode = nullptr;
      auto ppos = pos;
      if (prevKeys > 0) {
957
        mergeBranchNodes(lSibling, nodeRef.keys[pos - 1], child);
958
959
960
        ppos = pos - 1;
        witnessNode = child;
        newChild = lSibling;
961
        /// pos -= 1;
962
      } else if (nextKeys > 0) {
963
        mergeBranchNodes(child, nodeRef.keys[pos], rSibling);
964
965
        witnessNode = rSibling;
      } else
966
        /// shouldn't happen
967
        assert(false);
968
969
970
      /// remove nodeRef.keys.get_ro()[pos] from node
      for (auto i = ppos; i < nodeRef.numKeys - 1; i++) {
        nodeRef.keys[i] = nodeRef.keys[i + 1];
971
972
      }
      if (pos == 0) pos++;
973
974
975
      for (auto i = pos; i < nodeRef.numKeys; i++) {
        if (i + 1 <= nodeRef.numKeys) {
          nodeRef.children[i] = nodeRef.children[i + 1];
976
977
        }
      }
978
      nodeRef.numKeys--;
979
980
981
982
983
984
      deleteBranchNode(witnessNode);
      return newChild;
    }
  }

  /**
985
986
987
   * Redistribute (key, value) pairs from the leaf node @c donor to the leaf node @c receiver such
   * that both nodes have approx. the same number of elements. This method is used in case of an
   * underflow situation of a leaf node.
988
989
990
991
   *
   * @param donor the leaf node from which the elements are taken
   * @param receiver the sibling leaf node getting the elements from @c donor
   */
992
993
994
  void balanceLeafNodes(const pptr<LeafNode> &donor, const pptr<LeafNode> &receiver) {
    auto &donorRef = *donor;
    auto &receiverRef = *receiver;
995
996
    const auto dNumKeys = donorRef.bits.get_ro().count();
    const auto rNumKeys = receiverRef.bits.get_ro().count();
997
998
999
    assert(dNumKeys > rNumKeys);
    unsigned int balancedNum = (dNumKeys + rNumKeys) / 2;
    unsigned int toMove = dNumKeys - balancedNum;
1000
    if (toMove == 0) return;
For faster browsing, not all history is shown. View entire blame