FPTree.hpp 46.3 KB
Newer Older
1
/*
2
 * Copyright (C) 2017-2019 DBIS Group - TU Ilmenau, All Rights Reserved.
3
 *
4
 * This file is part of our NVM-based Data Structures repository.
5
 *
6
7
8
 * This program is free software: you can redistribute it and/or modify it under the terms of the
 * GNU General Public License as published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
9
 *
10
11
12
 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 * See the GNU General Public License for more details.
13
 *
14
15
 * You should have received a copy of the GNU General Public License along with this program.
 * If not, see <http://www.gnu.org/licenses/>.
16
17
18
19
20
21
 */

#ifndef DBIS_FPTree_hpp_
#define DBIS_FPTree_hpp_

#include <array>
22
#include <bitset>
23
#include <cmath>
24
25
#include <iostream>

26
#include <libpmemobj/ctl.h>
27
28
29
30
31
32
#include <libpmemobj++/make_persistent.hpp>
#include <libpmemobj++/p.hpp>
#include <libpmemobj++/persistent_ptr.hpp>
#include <libpmemobj++/transaction.hpp>
#include <libpmemobj++/utils.hpp>

33
#include "config.h"
34
#include "utils/BitOperations.hpp"
35
#include "utils/PersistEmulation.hpp"
36
#include "utils/SearchFunctions.hpp"
37

38
namespace dbis::pbptrees {
39

40
using pmem::obj::allocation_flag;
41
42
43
44
45
using pmem::obj::delete_persistent;
using pmem::obj::make_persistent;
using pmem::obj::p;
using pmem::obj::persistent_ptr;
using pmem::obj::transaction;
46
47
template <typename Object>
using pptr = persistent_ptr<Object>;
48
49
50
51
52
53
54
55
56
57
58

/**
 * A persistent memory implementation of a FPTree.
 *
 * @tparam KeyType the data type of the key
 * @tparam ValueType the data type of the values associated with the key
 * @tparam N the maximum number of keys on a branch node
 * @tparam M the maximum number of keys on a leaf node
 */
template<typename KeyType, typename ValueType, int N, int M>
class FPTree {
59
  /// we need at least three keys on a branch node to be able to split
60
  static_assert(N > 2, "number of branch keys has to be >2.");
61
62
  /// we need an even order for branch nodes to be able to merge
  static_assert(N % 2 == 0, "order of branch nodes must be even.");
63
  /// we need at least one key on a leaf node
64
65
66
67
68
69
70
71
  static_assert(M > 0, "number of leaf keys should be >0.");

#ifndef UNIT_TESTS
  private:
#else
  public:
#endif

72
  /// Forward declarations
73
74
75
  struct LeafNode;
  struct BranchNode;

76
77
  struct Node {
    Node() : tag(BLANK) {};
78

79
    Node(const pptr<LeafNode> &leaf_) : tag(LEAF), leaf(leaf_) {};
80

81
    Node(BranchNode *branch_) : tag(BRANCH), branch(branch_) {};
82

83
    Node(const Node &other) { copy(other); };
84

85
    void copy(const Node &other) throw() {
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
      tag = other.tag;

      switch (tag) {
        case LEAF: {
          leaf = other.leaf;
          break;
        }
        case BRANCH: {
          branch = other.branch;
          break;
        }
        default: break;
      }
    }

101
    Node &operator=(Node other) {
102
103
104
105
106
      copy(other);
      return *this;
    }

    enum NodeType {
107
      BLANK, LEAF, BRANCH
108
109
    } tag;
    union {
110
      pptr<LeafNode> leaf;
111
112
113
114
115
116
117
      BranchNode *branch;
    };
  };

  /**
   * A structure for representing a leaf node of a B+ tree.
   */
118
119
120
121
122
  struct alignas(64) LeafNode {

    static constexpr auto NUM_KEYS = M;
    using KEY_TYPE = KeyType;

123
124
125
    /**
     * Constructor for creating a new empty leaf node.
     */
126
127
    LeafNode() : nextLeaf(nullptr), prevLeaf(nullptr) {}

128
129
130
131
132
133
134
135
136
137
138
139
140
    static constexpr auto BitsetSize = ((M + 63) / 64) * 8;  ///< number * size of words
    static constexpr auto HashSize = ((M + 7) / 8) * 8;  ///< round to 8 Byte
    static constexpr auto SearchSize = BitsetSize + HashSize + 32;
    static constexpr auto PaddingSize = (64 - SearchSize % 64) % 64;

    // p<LeafSearch> search;               ///< helper structure for faster searches
    p<std::bitset<M>> bits;              ///< bitset for valid entries
    p<std::array<uint8_t, M>> fp;        ///< fingerprint array (n & 0xFF)
    pptr<LeafNode> nextLeaf;             ///< pointer to the subsequent sibling
    pptr<LeafNode> prevLeaf;             ///< pointer to the preceeding sibling
    char padding[PaddingSize];           ///< padding to align keys to 64 bytes
    p<std::array<KeyType, M>> keys;      ///< the actual keys
    p<std::array<ValueType, M>> values;  ///< the actual values
141
142
143
144
145
  };

  /**
   * A structure for representing an branch node (branch node) of a B+ tree.
   */
146
147
148
149
  struct alignas(64) BranchNode {

    static constexpr auto NUM_KEYS = N;

150
151
152
153
154
    /**
     * Constructor for creating a new empty branch node.
     */
    BranchNode() : numKeys(0) {}

155
156
157
    unsigned int numKeys;             ///< the number of currently stored keys
    std::array<KeyType, N> keys;      ///< the actual keys
    std::array<Node, N + 1> children; ///< pointers to child nodes (BranchNode or LeafNode)
158
159
160
161
162
  };

  /**
   * Create a new empty leaf node
   */
163
  pptr<LeafNode> newLeafNode() {
164
    auto pop = pmem::obj::pool_by_vptr(this);
165
    pptr<LeafNode> newNode = nullptr;
166
167
168
    transaction::run(pop, [&] {
      newNode = make_persistent<LeafNode>(allocation_flag::class_id(alloc_class.class_id));
    });
169
170
171
    return newNode;
  }

172
  pptr<LeafNode> newLeafNode(const pptr<LeafNode> &other) {
173
    auto pop = pmem::obj::pool_by_vptr(this);
174
    pptr<LeafNode> newNode = nullptr;
175
176
177
    transaction::run(pop, [&] {
      newNode = make_persistent<LeafNode>(allocation_flag::class_id(alloc_class.class_id), *other);
    });
178
179
180
    return newNode;
  }

181
  void deleteLeafNode(const pptr<LeafNode> &node) {
182
    auto pop = pmem::obj::pool_by_vptr(this);
183
    transaction::run(pop, [&] { delete_persistent<LeafNode>(node); });
184
185
186
187
188
189
  }

  /**
   * Create a new empty branch node
   */
  BranchNode *newBranchNode() {
190
    return new BranchNode();
191
192
  }

193
  void deleteBranchNode(const BranchNode * const node) {
194
    delete node;
195
196
197
  }

  /**
198
   * A structure for passing information about a node split to the caller.
199
200
   */
  struct SplitInfo {
201
202
203
    KeyType key;     ///< the key at which the node was split
    Node leftChild;  ///< the resulting lhs child node
    Node rightChild; ///< the resulting rhs child node
204
205
  };

206
207
208
209
210
211
  static constexpr pobj_alloc_class_desc AllocClass{256, 64, 1, POBJ_HEADER_COMPACT};
  pobj_alloc_class_desc alloc_class;
  unsigned int depth;       /**< the depth of the tree, i.e. the number of levels
                                 (0 => rootNode is LeafNode) */
  Node rootNode;            ///< pointer to the root node
  pptr<LeafNode> leafList;  ///< pointer to the leaf at the most left position (for recovery)
212

213
214
215
  /* -------------------------------------------------------------------------------------------- */

 public:
216
217
218
219
  /**
   * Iterator for iterating over the leaf nodes
   */
  class iterator {
220
221
    pptr<LeafNode> currentNode;
    unsigned int currentPosition;
222
223
224

    public:
    iterator() : currentNode(nullptr), currentPosition(0) {}
225
226
    iterator(const Node &root, unsigned int d) {
      /// traverse to left-most key
227
      auto node = root;
228
      while (d-- > 0) node = node.branch->children[0];
229
230
      currentNode = node.leaf;
      currentPosition = 0;
231
      const auto &nodeBits = currentNode->bits.get_ro();
232
      /// Can not overflow as there are at least M/2 entries
233
      while(!nodeBits.test(currentPosition)) ++currentPosition;
234
235
236
    }

    iterator& operator++() {
237
      if (currentPosition >= M-1) {
238
239
        currentNode = currentNode->nextLeaf;
        currentPosition = 0;
240
        if (currentNode == nullptr) return *this;
241
        const auto &nodeBits = currentNode->bits.get_ro();
242
        while(!nodeBits.test(currentPosition)) ++currentPosition;
243
      } else if (!currentNode->bits.get_ro().test(++currentPosition)) ++(*this);
244
245
246
      return *this;
    }

247
248
249
250
251
252
253
254
255
256
257
    iterator operator++(int) {
      iterator retval = *this;
      ++(*this);
      return retval;
    }

    bool operator==(iterator other) const {
      return (currentNode == other.currentNode && currentPosition == other.currentPosition);
    }

    bool operator!=(iterator other) const { return !(*this == other); }
258
259

    std::pair<KeyType, ValueType> operator*() {
260
261
262
      const auto nodeRef = *currentNode;
      return std::make_pair(nodeRef.keys.get_ro()[currentPosition],
                            nodeRef.values.get_ro()[currentPosition]);
263
264
    }

265
    /// iterator traits
266
267
268
269
270
271
272
273
    using difference_type = long;
    using value_type = std::pair<KeyType, ValueType>;
    using pointer = const std::pair<KeyType, ValueType>*;
    using reference = const std::pair<KeyType, ValueType>&;
    using iterator_category = std::forward_iterator_tag;
  };
  iterator begin() { return iterator(rootNode, depth); }
  iterator end() { return iterator(); }
274
275
276
277
278
279
280
281

  /* -------------------------------------------------------------------------------------------- */

  /**
   * Typedef for a function passed to the scan method.
   */
  using ScanFunc = std::function<void(const KeyType &key, const ValueType &val)>;

282
283
284
  /**
   * Constructor for creating a new  tree.
   */
285
286
  explicit FPTree(struct pobj_alloc_class_desc _alloc) : depth(0), alloc_class(_alloc) {
  // FPTree() {
287
    rootNode = newLeafNode();
288
289
    leafList = rootNode.leaf;
    depth = 0;
290
291
    LOG("created new FPTree with sizeof(BranchNode) = "
        << sizeof(BranchNode) << ", sizeof(LeafNode) = " << sizeof(LeafNode));
292
293
294
295
296
  }

  /**
   * Destructor for the tree. Should delete all allocated nodes.
   */
297
  ~FPTree() {}
298
299

  /**
300
301
   * Insert an element (a key-value pair) into the tree. If the key @c key already exists, the
   * corresponding value is replaced by @c val.
302
303
304
305
306
   *
   * @param key the key of the element to be inserted
   * @param val the value that is associated with the key
   */
  void insert(const KeyType &key, const ValueType &val) {
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
    SplitInfo splitInfo;

    bool wasSplit = false;
    if (depth == 0) {
      /// the root node is a leaf node
      auto n = rootNode.leaf;
      wasSplit = insertInLeafNode(n, key, val, &splitInfo);
    } else {
      /// the root node is a branch node
      auto n = rootNode.branch;
      wasSplit = insertInBranchNode(n, depth, key, val, &splitInfo);
    }
    if (wasSplit) {
      /// we had an overflow in the node and therefore the node is split
      const auto root = newBranchNode();
      auto &rootRef = *root;
      rootRef.keys[0] = splitInfo.key;
      rootRef.children[0] = splitInfo.leftChild;
      rootRef.children[1] = splitInfo.rightChild;
      ++rootRef.numKeys;
      rootNode.branch = root;
      ++depth;
    }
330
331
332
  }

  /**
333
   * Find the given @c key in the  tree and if found return the corresponding value.
334
335
   *
   * @param key the key we are looking for
336
   * @param[out] val a pointer to memory where the value is stored if the key was found
337
338
339
340
   * @return true if the key was found, false otherwise
   */
  bool lookup(const KeyType &key, ValueType *val)  {
    assert(val != nullptr);
341
342
    const auto leaf = findLeafNode(key);
    const auto pos = lookupPositionInLeafNode(leaf, key);
343
    if (pos < M) {
344
345
346
      /// we found it!
      *val = leaf->values.get_ro()[pos];
      return true;
347
    }
348
    return false;
349
350
351
352
353
354
355
356
357
358
  }

  /**
   * Delete the entry with the given key @c key from the tree.
   *
   * @param key the key of the entry to be deleted
   * @return true if the key was found and deleted
   */
  bool erase(const KeyType &key) {
    bool result;
359
360
361
362
363
364
365
366
367
368
    if (depth == 0) {
      /// special case: the root node is a leaf node and there is no need to handle underflow
      auto node = rootNode.leaf;
      assert(node != nullptr);
      result=eraseFromLeafNode(node, key);
    } else {
      auto node = rootNode.branch;
      assert(node != nullptr);
      result=eraseFromBranchNode(node, depth, key);
    }
369
370
    return result;
  }
371

372
373
374
  /**
   * Recover the FPTree by iterating over the LeafList and using the recoveryInsert method.
   */
375
  void recover() {
376
    LOG("Starting RECOVERY of FPTree");
377
    pptr<LeafNode> currentLeaf = leafList;
378
    if (leafList == nullptr) {
379
      LOG("No data to recover FPTree");
380
      return;
381
    }
382
383
384
385
386
387
    /* counting leafs */
    auto leafs = 0u;
    while(currentLeaf != nullptr) {
      ++leafs;
      currentLeaf = currentLeaf->nextLeaf;
    }
388
389
    // float x = std::log(leafs)/std::log(N+1);
    // assert(x == int(x) && "Not supported for this amount of leafs, yet");
390
391
392
393

    /* actual recovery */
    currentLeaf = leafList;
    if (leafList->nextLeaf == nullptr) {
394
      /// The index has only one node, so the leaf node becomes the root node
395
396
      rootNode = leafList;
      depth = 0;
397
    } else {
398
      rootNode = newBranchNode();
399
      depth = 1;
400
401
      rootNode.branch->children[0] = currentLeaf;
      currentLeaf = currentLeaf->nextLeaf;
402
403
404
405
406
      while (currentLeaf != nullptr) {
        recoveryInsert(currentLeaf);
        currentLeaf = currentLeaf->nextLeaf;
      }
    }
407
    LOG("RECOVERY Done")
408
409
410
411
412
413
  }

  /**
   * Print the structure and content of the tree to stdout.
   */
  void print() const {
414
415
    if (depth == 0) printLeafNode(0, rootNode.leaf);
    else printBranchNode(0u, rootNode.branch);
416
417
418
419
420
421
422
423
  }

  /**
   * Perform a scan over all key-value pairs stored in the tree.
   * For each entry the given function @func is called.
   *
   * @param func the function called for each entry
   */
424
  void scan(ScanFunc func) const {
425
    /// we traverse to the leftmost leaf node
426
427
    auto node = rootNode;
    auto d = depth;
428
    while ( d-- > 0) node = node.branch->children[0];
429
430
    auto leaf = node.leaf;
    while (leaf != nullptr) {
431
      auto &leafRef = *leaf;
432
      /// for each key-value pair call func
433
      const auto &leafBits = leafRef.bits.get_ro();
434
435
436
437
438
439
      const auto &leafKeys = leafRef.keys.get_ro();
      const auto &leafValues = leafRef.values.get_ro();
      for (auto i = 0u; i < M; i++) {
        if (!leafBits.test(i)) continue;
        const auto &key = leafKeys[i];
        const auto &val = leafValues[i];
440
441
        func(key, val);
      }
442
443
      /// move to the next leaf node
      leaf = leafRef.nextLeaf;
444
    }
445
  }
446
447

  /**
448
449
   * Perform a range scan over all elements within the range [minKey, maxKey] and for each element
   * call the given function @c func.
450
451
452
453
454
   *
   * @param minKey the lower boundary of the range
   * @param maxKey the upper boundary of the range
   * @param func the function called for each entry
   */
455
  void scan(const KeyType &minKey, const KeyType &maxKey, ScanFunc func) const {
456
457
458
459
    auto leaf = findLeafNode(minKey);

    bool higherThanMax = false;
    while (!higherThanMax && leaf != nullptr) {
460
461
      /// for each key-value pair within the range call func
      const auto leafRef = *leaf;
462
      const auto &leafBits = leafRef.bits.get_ro();
463
464
      const auto &leafKeys = leafRef.keys.get_ro();
      const auto &leafValues = leafRef.values.get_ro();
465
      for (auto i = 0u; i < M; i++) {
466
467
        if (!leafBits.test(i)) continue;
        auto &key = leafKeys[i];
468
469
470
        if (key < minKey) continue;
        if (key > maxKey) { higherThanMax = true; continue; }

471
        auto &val = leafValues[i];
472
473
        func(key, val);
      }
474
475
      /// move to the next leaf node
      leaf = leafRef.nextLeaf;
476
    }
477
  }
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492

#ifndef UNIT_TESTS
  private:
#endif

  /**
   * Insert a (key, value) pair into the corresponding leaf node. It is the
   * responsibility of the caller to make sure that the node @c node is
   * the correct node. The key is inserted at the correct position.
   *
   * @param node the node where the key-value pair is inserted.
   * @param key the key to be inserted
   * @param val the value associated with the key
   * @param splitInfo information about a possible split of the node
   */
493
  bool insertInLeafNode(const pptr<LeafNode> &node, const KeyType &key,
494
      const ValueType &val, SplitInfo *splitInfo) {
495
    auto &nodeRef = *node;
496
497
498
    bool split = false;
    auto pos = lookupPositionInLeafNode(node, key);

499
    if (pos < M) {
500
      /// handle insert of duplicates
501
      nodeRef.values.get_rw()[pos] = val;
502
      return false;
503
    }
504
    pos = BitOperations::getFreeZero(nodeRef.bits.get_ro());
505
    if (pos == M) {
506
507
508
509
510
511
512
      /* split the node */
      splitLeafNode(node, splitInfo);
      auto &splitRef = *splitInfo;
      auto sibling= splitRef.rightChild.leaf;
      auto &sibRef= *sibling;

      /* insert the new entry */
513
      if (key > splitRef.key) {
514
        insertInLeafNodeAtPosition(sibling, BitOperations::getFreeZero(sibRef.bits.get_ro()), key, val);
515
      } else {
516
        if (key > findMaxKey(nodeRef.keys.get_ro(), nodeRef.bits.get_ro())) {
517
          /// Special case: new key would be the middle, thus must be right
518
          insertInLeafNodeAtPosition(sibling, BitOperations::getFreeZero(sibRef.bits.get_ro()), key, val);
519
520
          splitRef.key = key;
        } else {
521
          insertInLeafNodeAtPosition(node, BitOperations::getFreeZero(nodeRef.bits.get_ro()), key, val);
522
523
        }
      }
524
      /* inform the caller about the split */
525
526
      split = true;
    } else {
527
      /* otherwise, we can simply insert the new entry at the given position */
528
529
530
531
532
      insertInLeafNodeAtPosition(node, pos, key, val);
    }
    return split;
  }

533
534
535
536
537
538
539
  /**
   * Split the given leaf node @c node in the middle and move half of the keys/children to the new
   * sibling node.
   *
   * @param node the leaf node to be split
   * @param splitInfo[out] information about the split
   */
540
  void splitLeafNode(const pptr<LeafNode> &node, SplitInfo *splitInfo) {
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
    auto &nodeRef = *node;

    /* determine the split position by finding median in unsorted array of keys*/
    const auto data = nodeRef.keys.get_ro();
    auto [bitmap, splitPos] = findSplitKey(data);
    const auto &splitKey = data[splitPos];

    /// copy leaf
    /*
    const auto sibling = newLeafNode(node);
    auto &sibRef = *sibling;
    nodeRef.bits.get_rw() = bitmap;
    sibRef.bits.get_rw() = bitmap.flip();
    PersistEmulation::writeBytes<sizeof(LeafNode) + ((2*M+7)>>3)>(); /// copy leaf + 2 bitmaps
    */

    /// Alternative: move instead of complete copy
    ///*
    const auto sibling = newLeafNode();
    auto &sibRef = *sibling;
    auto &sibBits = sibRef.bits.get_rw();
    auto &sibHashs = sibRef.fp.get_rw();
    auto &sibKeys = sibRef.keys.get_rw();
    auto &sibValues = sibRef.values.get_rw();
    auto &nodeBits = nodeRef.bits.get_rw();
    const auto &nodeHashs = nodeRef.fp.get_ro();
    const auto &nodeKeys = nodeRef.keys.get_ro();
    const auto &nodeValues = nodeRef.values.get_ro();
    auto j = 0u;
    for (auto i = 0u; i < M; i++) {
      if (nodeKeys[i] >= splitKey) {
        sibKeys[j] = nodeKeys[i];
        sibValues[j] = nodeValues[i];
        sibHashs[j] = nodeHashs[i];
        sibBits.set(j);
        nodeBits.reset(i);
        j++;
578
      }
579
580
581
582
583
584
585
586
587
588
589
590
591
592
    }
    PersistEmulation::writeBytes(j * (sizeof(KeyType) + sizeof(ValueType) + 1) +
                                 ((j * 2 + 7) >> 3));  /// j entries/hashes + j*2 bits
    // */

    /// setup the list of leaf nodes
    if (nodeRef.nextLeaf != nullptr) {
      sibRef.nextLeaf = nodeRef.nextLeaf;
      nodeRef.nextLeaf->prevLeaf = sibling;
      PersistEmulation::writeBytes<16 * 2>();
    }
    nodeRef.nextLeaf = sibling;
    sibRef.prevLeaf = node;
    PersistEmulation::writeBytes<16 * 2>();
593

594
595
596
597
598
    /// set split information
    auto &splitRef = *splitInfo;
    splitRef.leftChild = node;
    splitRef.rightChild = sibling;
    splitRef.key = splitKey;
599
600
  }

601
602
603
604
605
606
607
608
609
610
611
612
  /**
   * Insert a (key, value) pair at the given position @c pos into the leaf node
   * @c node. The caller has to ensure that
   * - there is enough space to insert the element
   * - the key is inserted at the correct position according to the order of
   * keys
   *
   * @oaram node the leaf node where the element is to be inserted
   * @param pos the position in the leaf node (0 <= pos <= numKeys < M)
   * @param key the key of the element
   * @param val the actual value corresponding to the key
   */
613
  void insertInLeafNodeAtPosition(const pptr<LeafNode> &node, const unsigned int pos,
614
                                  const KeyType &key, const ValueType &val) {
615
    assert(pos < M);
616
    auto &nodeRef = *node;
617

618
619
620
    /// insert the new entry at the given position
    nodeRef.keys.get_rw()[pos] = key;
    nodeRef.values.get_rw()[pos] = val;
621

622
    /// set bit and hash
623
624
    nodeRef.bits.get_rw().set(pos);
    nodeRef.fp.get_rw()[pos] = fpHash(key);
625
    PersistEmulation::writeBytes<sizeof(KeyType) + sizeof(ValueType) + 2>();
626
627
628
629
630
631
632
  }

  /**
   * Insert a (key, value) pair into the tree recursively by following the path
   * down to the leaf level starting at node @c node at depth @c depth.
   *
   * @param node the starting node for the insert
633
   * @param d the current depth of the tree (0 == leaf level)
634
635
636
637
638
   * @param key the key of the element
   * @param val the actual value corresponding to the key
   * @param splitInfo information about the split
   * @return true if a split was performed
   */
639
640
  bool insertInBranchNode(BranchNode * const node, unsigned int d, const KeyType &key,
                          const ValueType &val, SplitInfo *splitInfo) {
641
642
    SplitInfo childSplitInfo;
    bool split = false, hasSplit = false;
643
    auto &nodeRef = *node;
644
645

    auto pos = lookupPositionInBranchNode(node, key);
646
647
648
    if (d == 1) {
      /// case #1: our children are leaf nodes
      auto child = nodeRef.children[pos].leaf;
649
      hasSplit = insertInLeafNode(child, key, val, &childSplitInfo);
650
    } else {
651
652
653
      /// case #2: our children are branch nodes
      auto child = nodeRef.children[pos].branch;
      hasSplit = insertInBranchNode(child, d - 1, key, val, &childSplitInfo);
654
655
    }
    if (hasSplit) {
656
657
658
659
      auto host = node;
      /// the child node was split, thus we have to add a new entry
      /// to our branch node
      if (nodeRef.numKeys == N) {
660
        splitBranchNode(node, childSplitInfo.key, splitInfo);
661
662
        const auto &splitRef = *splitInfo;
        host = (key < splitRef.key ? splitRef.leftChild : splitRef.rightChild).branch;
663
664
665
        split = true;
        pos = lookupPositionInBranchNode(host, key);
      }
666
667
668
669
670
671
672
673
      auto &hostRef = *host;
      if (pos < hostRef.numKeys) {
        /// if the child isn't inserted at the rightmost position
        /// then we have to make space for it
        hostRef.children[hostRef.numKeys + 1] = hostRef.children[hostRef.numKeys];
        for (auto i = hostRef.numKeys; i > pos; i--) {
          hostRef.children[i] = hostRef.children[i - 1];
          hostRef.keys[i] = hostRef.keys[i - 1];
674
675
        }
      }
676
677
678
679
680
      /// finally, add the new entry at the given position
      hostRef.keys[pos] = childSplitInfo.key;
      hostRef.children[pos] = childSplitInfo.leftChild;
      hostRef.children[pos + 1] = childSplitInfo.rightChild;
      hostRef.numKeys = hostRef.numKeys + 1;
681
682
683
684
685
    }
    return split;
  }

  /**
686
687
   * Split the given branch node @c node in the middle and move half of the keys/children to the new
   * sibling node.
688
689
690
691
692
   *
   * @param node the branch node to be split
   * @param splitKey the key on which the split of the child occured
   * @param splitInfo information about the split
   */
693
694
695
696
697
698
699
700
701
702
703
704
705
706
  void splitBranchNode(BranchNode * const node, const KeyType &splitKey, SplitInfo *splitInfo) {
    auto &nodeRef = *node;
    /// determine the split position
    auto middle = (N + 1) / 2;
    /// adjust the middle based on the key we have to insert
    if (splitKey > nodeRef.keys[middle]) middle++;

    /// move all entries behind this position to a new sibling node
    const auto sibling = newBranchNode();
    auto &sibRef = *sibling;
    sibRef.numKeys = nodeRef.numKeys - middle;
    for (auto i = 0u; i < sibRef.numKeys; i++) {
      sibRef.keys[i] = nodeRef.keys[middle + i];
      sibRef.children[i] = nodeRef.children[middle + i];
707
    }
708
709
    sibRef.children[sibRef.numKeys] = nodeRef.children[nodeRef.numKeys];
    nodeRef.numKeys = middle - 1;
710

711
712
713
714
    auto &splitRef = *splitInfo;
    splitRef.key = nodeRef.keys[middle - 1];
    splitRef.leftChild = node;
    splitRef.rightChild = sibling;
715
716
717
  }

  /**
718
719
720
   * Traverse the tree starting at the root until the leaf node is found that could contain the
   * given @key. Note, that always a leaf node is returned even if the key doesn't exist on this
   * node.
721
722
723
724
   *
   * @param key the key we are looking for
   * @return the leaf node that would store the key
   */
725
  pptr<LeafNode> findLeafNode(const KeyType &key) const {
726
727
    auto node = rootNode;
    auto d = depth;
728
    while (d-- > 0) {
729
730
731
732
733
734
735
      auto n = node.branch;
      auto pos = lookupPositionInBranchNode(n, key);
      node = n->children[pos];
    }
    return node.leaf;
  }
  /**
736
   * Lookup the search key @c key in the given leaf node and return the position.
737
   * If the search key was not found, then @c M is returned.
738
739
740
   *
   * @param node the leaf node where we search
   * @param key the search key
741
   * @return the position of the key  (or @c M if not found)
742
   */
743
744
  auto lookupPositionInLeafNode(const pptr<LeafNode> &node, const KeyType &key) const {
    auto pos = 0u;
Philipp Götze's avatar
Philipp Götze committed
745
    const auto &nodeRef = *node;
746
    const auto hash = fpHash(key);
Philipp Götze's avatar
Philipp Götze committed
747
    const auto &keys = nodeRef.keys.get_ro();
748
749
750
751
752
    const auto &bits = nodeRef.bits.get_ro();
    const auto &hashs = nodeRef.fp.get_ro();

    for (; pos < M; ++pos)
      if (hashs[pos] == hash && bits.test(pos) && keys[pos] == key) break;
753
754
755
756
    return pos;
  }

  /**
757
758
759
   * Lookup the search key @c key in the given branch node and return the position which is the
   * position in the list of keys + 1. in this way, the position corresponds to the position of the
   * child pointer in the array @children.
760
761
762
   * If the search key is less than the smallest key, then @c 0 is returned.
   * If the key is greater than the largest key, then @c numKeys is returned.
   *
763
   * @param node the branch node where we search
764
765
766
   * @param key the search key
   * @return the position of the key + 1 (or 0 or @c numKey)
   */
767
768
  auto lookupPositionInBranchNode(const BranchNode * const node, const KeyType &key) const {
    const auto num = node->numKeys;
769
    const auto &keys = node->keys;
770
771
772
    // auto pos = 0u;
    // for (; pos < num && keys[pos] <= key; ++pos);
    // return pos;
773
    return binarySearch<true>(keys, 0, num - 1, key);
774
775
776
777
778
779
780
781
782
  }

  /**
   * Delete the element with the given key from the given leaf node.
   *
   * @param node the leaf node from which the element is deleted
   * @param key the key of the element to be deleted
   * @return true of the element was deleted
   */
783
  bool eraseFromLeafNode(const pptr<LeafNode> &node, const KeyType &key) {
784
    auto pos = lookupPositionInLeafNode(node, key);
785
786
787
788
789
790
791
792
793
794
795
796
797
    return eraseFromLeafNodeAtPosition(node, pos, key);
  }

  /**
   * Delete the element with the given position and key from the given leaf node.
   *
   * @param node the leaf node from which the element is deleted
   * @param pos the position of the key in the node
   * @param key the key of the element to be deleted
   * @return true of the element was deleted
   */
  bool eraseFromLeafNodeAtPosition(const pptr<LeafNode> &node, const unsigned int pos,
                                   const KeyType &key) {
798
    if (pos < M) {
799
800
      node->bits.get_rw().reset(pos);
      PersistEmulation::writeBytes<1>();
801
      return true;
802
    }
803
    return false;
804
805
806
  }

  /**
807
808
   * Delete an entry from the tree by recursively going down to the leaf level and handling the
   * underflows.
809
810
811
812
813
814
   *
   * @param node the current branch node
   * @param d the current depth of the traversal
   * @param key the key to be deleted
   * @return true if the entry was deleted
   */
815
816
  bool eraseFromBranchNode(BranchNode * const node, unsigned int d, const KeyType &key) {
    auto &nodeRef = *node;
817
818
    assert(d >= 1);
    bool deleted = false;
819
    /// try to find the branch
820
    auto pos = lookupPositionInBranchNode(node, key);
821
    if (d == 1) {
822
823
      /// the next level is the leaf level
      auto leaf = nodeRef.children[pos].leaf;
824
825
      assert(leaf != nullptr);
      deleted = eraseFromLeafNode(leaf, key);
826
      constexpr auto middle = (M + 1) / 2;
827
      if (leaf->bits.get_ro().count() < middle) {
828
        /// handle underflow
829
        underflowAtLeafLevel(node, pos, leaf);
830
831
      }
    } else {
832
      auto child = nodeRef.children[pos].branch;
833
834
835
      deleted = eraseFromBranchNode(child, d - 1, key);

      pos = lookupPositionInBranchNode(node, key);
836
      constexpr auto middle = (N + 1) / 2;
837
      if (child->numKeys < middle) {
838
        /// handle underflow
839
        child = underflowAtBranchLevel(node, pos, child);
840
841
        if (d == depth && nodeRef.numKeys == 0) {
          /// special case: the root node is empty now
842
          rootNode = child;
843
          --depth;
844
845
846
847
848
849
850
        }
      }
    }
    return deleted;
  }

  /**
851
852
   * Handle the case that during a delete operation a underflow at node @c leaf occured.
   * If possible this is handled
853
854
855
856
   * (1) by rebalancing the elements among the leaf node and one of its siblings
   * (2) if not possible by merging with one of its siblings.
   *
   * @param node the parent node of the node where the underflow occured
857
   * @param pos the position of the child node @leaf in the @c children array of the branch node
858
859
   * @param leaf the node at which the underflow occured
   */
860
861
862
  void underflowAtLeafLevel(BranchNode * const node, unsigned int pos, const pptr<LeafNode> &leaf) {
    auto &nodeRef = *node;
    auto &leafRef = *leaf;
863
    auto prevNumKeys = 0u, nextNumKeys = 0u;
864
865
866
867
    assert(pos <= nodeRef.numKeys);
    constexpr auto middle = (M + 1) / 2;
    /// 1. we check whether we can rebalance with one of the siblings but only if both nodes have
    ///    the same direct parent
868
    if (pos > 0 && (prevNumKeys = leafRef.prevLeaf->bits.get_ro().count()) > middle) {
869
870
871
      /// we have a sibling at the left for rebalancing the keys
      balanceLeafNodes(leafRef.prevLeaf, leaf);

872
873
874
      nodeRef.keys[pos - 1] =
          leafRef.keys.get_ro()[findMinKey(leafRef.keys.get_ro(), leafRef.bits.get_ro())];
    } else if (pos < nodeRef.numKeys && leafRef.nextLeaf->bits.get_ro().count() > middle) {
875
876
      /// we have a sibling at the right for rebalancing the keys
      balanceLeafNodes(leafRef.nextLeaf, leaf);
877
      auto &nextLeaf = *leafRef.nextLeaf;
878
879
      nodeRef.keys[pos] =
          nextLeaf.keys.get_ro()[findMinKey(nextLeaf.keys.get_ro(), nextLeaf.bits.get_ro())];
880
    } else {
881
882
883
      /// 2. if this fails we have to merge two leaf nodes but only if both nodes have the same
      ///    direct parent
      pptr<LeafNode> survivor = nullptr;
884
      if (pos > 0 && leafRef.prevLeaf->bits.get_ro().count() <= middle) {
885
        survivor = mergeLeafNodes(leafRef.prevLeaf, leaf);
886
        deleteLeafNode(leaf);
887
      } else if (pos < nodeRef.numKeys && leafRef.nextLeaf->bits.get_ro().count() <= middle) {
888
889
        /// because we update the pointers in mergeLeafNodes we keep it here
        auto l = leafRef.nextLeaf;
890
891
        survivor = mergeLeafNodes(leaf, l);
        deleteLeafNode(l);
892
893
      } else assert(false); ///< this shouldn't happen?!

894
      if (nodeRef.numKeys > 1) {
895
        if (pos > 0) pos--;
896
897
898
899
        /// just remove the child node from the current branch node
        for (auto i = pos; i < nodeRef.numKeys - 1; i++) {
          nodeRef.keys[i] = nodeRef.keys[i + 1];
          nodeRef.children[i + 1] = nodeRef.children[i + 2];
900
        }
901
902
        nodeRef.children[pos] = survivor;
        --nodeRef.numKeys;
903
      } else {
904
905
        /// This is a special case that happens only if the current node is the root node.
        /// Now, we have to replace the branch root node by a leaf node.
906
907
        rootNode = survivor;
        --depth;
908
909
      }
    }
910
  }
911
912

  /**
913
914
915
   * Handle the case that during a delete operation a underflow at node @c child occured where @c
   * node is the parent node. If possible this is handled
   * (1) by rebalancing the elements among the node @c child and one of its siblings
916
917
918
   * (2) if not possible by merging with one of its siblings.
   *
   * @param node the parent node of the node where the underflow occured
919
   * @param pos the position of the child node @child in the @c children array of the branch node
920
921
922
   * @param child the node at which the underflow occured
   * @return the (possibly new) child node (in case of a merge)
   */
923
924
  BranchNode* underflowAtBranchLevel(BranchNode * const node, unsigned int pos,
                                     BranchNode * const child) {
925
926
    assert(node != nullptr);
    assert(child != nullptr);
927
928
929
930
    auto &nodeRef = *node;
    auto newChild = child;
    constexpr auto middle = (N + 1) / 2;
    /// 1. we check whether we can rebalance with one of the siblings
931
932

    if (pos > 0 &&
933
934
        nodeRef.children[pos - 1].branch->numKeys >middle) {
      /// we have a sibling at the left for rebalancing the keys
935
      auto &sibling = nodeRef.children[pos - 1].branch;
936
937
      balanceBranchNodes(sibling, child, node, pos - 1);
      return newChild;
938
939
    } else if (pos < nodeRef.numKeys && nodeRef.children[pos + 1].branch->numKeys > middle) {
      /// we have a sibling at the right for rebalancing the keys
940
      auto &sibling = nodeRef.children[pos + 1].branch;
941
942
943
      balanceBranchNodes(sibling, child, node, pos);
      return newChild;
    } else {
944
      /// 2. if this fails we have to merge two branch nodes
945
946
947
      BranchNode *lSibling = nullptr, *rSibling = nullptr;
      unsigned int prevKeys = 0, nextKeys = 0;
      if (pos > 0) {
948
        lSibling = nodeRef.children[pos - 1].branch;
949
950
        prevKeys = lSibling->numKeys;
      }
951
952
      if (pos < nodeRef.numKeys) {
        rSibling = nodeRef.children[pos + 1].branch;
953
954
955
956
957
        nextKeys = rSibling->numKeys;
      }
      BranchNode *witnessNode = nullptr;
      auto ppos = pos;
      if (prevKeys > 0) {
958
        mergeBranchNodes(lSibling, nodeRef.keys[pos - 1], child);
959
960
961
        ppos = pos - 1;
        witnessNode = child;
        newChild = lSibling;
962
        /// pos -= 1;
963
      } else if (nextKeys > 0) {
964
        mergeBranchNodes(child, nodeRef.keys[pos], rSibling);
965
966
        witnessNode = rSibling;
      } else
967
        /// shouldn't happen
968
        assert(false);
969
970
971
      /// remove nodeRef.keys.get_ro()[pos] from node
      for (auto i = ppos; i < nodeRef.numKeys - 1; i++) {
        nodeRef.keys[i] = nodeRef.keys[i + 1];
972
973
      }
      if (pos == 0) pos++;
974
975
976
      for (auto i = pos; i < nodeRef.numKeys; i++) {
        if (i + 1 <= nodeRef.numKeys) {
          nodeRef.children[i] = nodeRef.children[i + 1];
977
978
        }
      }
979
      nodeRef.numKeys--;
980
981
982
983
984
985
      deleteBranchNode(witnessNode);
      return newChild;
    }
  }

  /**
986
987
988
   * Redistribute (key, value) pairs from the leaf node @c donor to the leaf node @c receiver such
   * that both nodes have approx. the same number of elements. This method is used in case of an
   * underflow situation of a leaf node.
989
990
991
992
   *
   * @param donor the leaf node from which the elements are taken
   * @param receiver the sibling leaf node getting the elements from @c donor
   */
993
994
995
  void balanceLeafNodes(const pptr<LeafNode> &donor, const pptr<LeafNode> &receiver) {
    auto &donorRef = *donor;
    auto &receiverRef = *receiver;
996
997
    const auto dNumKeys = donorRef.bits.get_ro().count();
    const auto rNumKeys = receiverRef.bits.get_ro().count();
998
999
1000
    assert(dNumKeys > rNumKeys);
    unsigned int balancedNum = (dNumKeys + rNumKeys) / 2;
    unsigned int toMove = dNumKeys - balancedNum;
1001
1002
    if (toMove == 0) return;

1003
1004
    auto &receiverBits = receiverRef.bits.get_rw();
    auto &receiverHashs = receiverRef.fp.get_rw();
1005
1006
    auto &receiverKeys = receiverRef.keys.get_rw();
    auto &receiverValues = receiverRef.values.get_rw();
1007
1008
    auto &donorBits = donorRef.bits.get_rw();
    const auto &donorHashs = donorRef.fp.get_ro();
1009
1010
1011
1012
    const auto &donorKeys = donorRef.keys.get_ro();
    const auto &donorValues = donorRef.values.get_ro();

    if (donorKeys[0] < receiverKeys[0]) {
1013
      /// move to a node with larger keys
1014
      for (auto i = 0u; i < toMove; ++i) {
1015
1016
1017
1018
        const auto max = findMaxKey(donorKeys, donorBits);
        const auto pos = BitOperations::getFreeZero(receiverBits);
        receiverBits.set(pos);
        receiverHashs[pos] = fpHash(donorKeys[max]);
1019
1020
        receiverKeys[pos] = donorKeys[max];
        receiverValues[pos] = donorValues[max];
1021
        donorBits.reset(max);
1022
1023
      }
    } else {
1024
      /// move to a node with smaller keys
1025
      for (auto i = 0u; i < toMove; ++i) {
1026
1027
1028
1029
        const auto min = findMinKey(donorKeys, donorBits);
        const auto pos = BitOperations::getFreeZero(receiverBits);
        receiverBits.set(pos);
        receiverHashs[pos] = fpHash(donorKeys[min]);
1030
1031
        receiverKeys[pos] = donorKeys[min];
        receiverValues[pos] = donorValues[min];
1032
        donorBits.reset(min);
1033
1034
      }
    }
1035
1036
1037
1038
    PersistEmulation::writeBytes(
        toMove * (sizeof(KeyType) + sizeof(ValueType) + 1) +  ///< move keys, vals, hashs
        ((2 * toMove + 7) >> 3)                               ///< (re)set ceiled bits
    );
1039
1040
1041
  }

  /**
1042
1043
1044
   * Rebalance two branch nodes by moving some key-children pairs from the node @c donor to the node
   * @receiver via the parent node @parent. The position of the key between the two nodes is denoted
   * by @c pos.
1045
1046
1047
1048
   *
   * @param donor the branch node from which the elements are taken
   * @param receiver the sibling branch node getting the elements from @c donor
   * @param parent the parent node of @c donor and @c receiver
1049
   * @param pos the position of the key in node @c parent that lies between @c donor and @c receiver
1050
   */
1051
1052
1053
1054
1055
1056
1057
1058
1059
  void balanceBranchNodes(BranchNode * const donor, BranchNode * const receiver,
                          BranchNode * const parent, const unsigned int pos) {
    auto &donorRef = *donor;
    auto &receiverRef = *receiver;
    auto &parentRef = *parent;
    assert(donorRef.numKeys > receiverRef.numKeys);

    unsigned int balancedNum = (donorRef.numKeys + receiverRef.numKeys) / 2;
    unsigned int toMove = donorRef.numKeys - balancedNum;
1060
1061
    if (toMove == 0) return;

1062
1063
    if (donorRef.keys[0] < receiverRef.keys[0]) {
      /// move from one node to a node with larger keys
1064
      unsigned int i = 0;