FPTree.hpp 46.2 KB
Newer Older
1001
1002
    auto &receiverBits = receiverRef.bits.get_rw();
    auto &receiverHashs = receiverRef.fp.get_rw();
1003
1004
    auto &receiverKeys = receiverRef.keys.get_rw();
    auto &receiverValues = receiverRef.values.get_rw();
1005
1006
    auto &donorBits = donorRef.bits.get_rw();
    const auto &donorHashs = donorRef.fp.get_ro();
1007
1008
1009
1010
    const auto &donorKeys = donorRef.keys.get_ro();
    const auto &donorValues = donorRef.values.get_ro();

    if (donorKeys[0] < receiverKeys[0]) {
1011
      /// move to a node with larger keys
1012
      for (auto i = 0u; i < toMove; ++i) {
1013
        const auto max = findMaxKeyPos(donorKeys, donorBits);
1014
        const auto pos = receiverBits.getFreeZero();
1015
1016
        receiverBits.set(pos);
        receiverHashs[pos] = fpHash(donorKeys[max]);
1017
1018
        receiverKeys[pos] = donorKeys[max];
        receiverValues[pos] = donorValues[max];
1019
        donorBits.reset(max);
1020
1021
      }
    } else {
1022
      /// move to a node with smaller keys
1023
      for (auto i = 0u; i < toMove; ++i) {
1024
        const auto min = findMinKeyPos(donorKeys, donorBits);
1025
        const auto pos = receiverBits.getFreeZero();
1026
1027
        receiverBits.set(pos);
        receiverHashs[pos] = fpHash(donorKeys[min]);
1028
1029
        receiverKeys[pos] = donorKeys[min];
        receiverValues[pos] = donorValues[min];
1030
        donorBits.reset(min);
1031
1032
      }
    }
1033
1034
1035
1036
    PersistEmulation::writeBytes(
        toMove * (sizeof(KeyType) + sizeof(ValueType) + 1) +  ///< move keys, vals, hashs
        ((2 * toMove + 7) >> 3)                               ///< (re)set ceiled bits
    );
1037
1038
1039
  }

  /**
1040
1041
1042
   * Rebalance two branch nodes by moving some key-children pairs from the node @c donor to the node
   * @receiver via the parent node @parent. The position of the key between the two nodes is denoted
   * by @c pos.
1043
1044
1045
1046
   *
   * @param donor the branch node from which the elements are taken
   * @param receiver the sibling branch node getting the elements from @c donor
   * @param parent the parent node of @c donor and @c receiver
1047
   * @param pos the position of the key in node @c parent that lies between @c donor and @c receiver
1048
   */
1049
1050
1051
1052
1053
1054
1055
1056
1057
  void balanceBranchNodes(BranchNode * const donor, BranchNode * const receiver,
                          BranchNode * const parent, const unsigned int pos) {
    auto &donorRef = *donor;
    auto &receiverRef = *receiver;
    auto &parentRef = *parent;
    assert(donorRef.numKeys > receiverRef.numKeys);

    unsigned int balancedNum = (donorRef.numKeys + receiverRef.numKeys) / 2;
    unsigned int toMove = donorRef.numKeys - balancedNum;
1058
1059
    if (toMove == 0) return;

1060
1061
    if (donorRef.keys[0] < receiverRef.keys[0]) {
      /// move from one node to a node with larger keys
1062
      unsigned int i = 0;
1063
1064
1065
1066
1067
1068
1069
      /// 1. make room
      receiverRef.children[receiverRef.numKeys + toMove] =
        receiverRef.children[receiverRef.numKeys];
      for (i = receiverRef.numKeys; i > 0; i--) {
        /// reserve space on receiver side
        receiverRef.keys[i + toMove - 1] = receiverRef.keys[i - 1];
        receiverRef.children[i + toMove - 1] = receiverRef.children[i - 1];
1070
      }
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
      /// 2. move toMove keys/children from donor to receiver
      for (i = 0; i < toMove; i++)
        receiverRef.children[i] = donorRef.children[donorRef.numKeys - toMove + 1 + i];
      for (i = 0; i < toMove - 1; i++)
        receiverRef.keys[i] = donorRef.keys[donorRef.numKeys - toMove + 1 + i];

      receiverRef.keys[toMove - 1] = parentRef.keys[pos];
      assert(parentRef.numKeys > pos);
      parentRef.keys[pos] = donorRef.keys[donorRef.numKeys - toMove];
      receiverRef.numKeys += toMove;
1081
    } else {
1082
1083
1084
      /// mode from one node to a node with smaller keys
      unsigned int i = 0, n = receiverRef.numKeys;
      /// 1. move toMove keys/children from donor to receiver
1085
      for (i = 0; i < toMove; i++) {
1086
1087
        receiverRef.children[n + 1 + i] = donorRef.children[i];
        receiverRef.keys[n + 1 + i] = donorRef.keys[i];
1088
      }
1089
1090
1091
1092
1093
1094
1095
1096
      /// 2. we have to move via the parent node: take the key from
      receiverRef.keys[n] = parentRef.keys[pos];
      receiverRef.numKeys += toMove;
      KeyType key = donorRef.keys[toMove - 1];
      /// 3. on donor node move all keys and values to the left
      for (i = 0; i < donorRef.numKeys - toMove; i++) {
        donorRef.keys[i] = donorRef.keys[toMove + i];
        donorRef.children[i] = donorRef.children[toMove + i];
1097
      }
1098
1099
1100
1101
      donorRef.children[donorRef.numKeys - toMove] = donorRef.children[donorRef.numKeys];
      /// and replace this key by donorRef.keys.get_ro()[0]
      assert(parentRef.numKeys > pos);
      parentRef.keys[pos] = key;
1102
    }
1103
    donorRef.numKeys -= toMove;
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
  }

  /**
   * Merge two leaf nodes by moving all elements from @c node2 to
   * @c node1.
   *
   * @param node1 the target node of the merge
   * @param node2 the source node
   * @return the merged node (always @c node1)
   */
1114
  pptr<LeafNode> mergeLeafNodes(const pptr<LeafNode> &node1, const pptr<LeafNode> &node2) {
1115
1116
    assert(node1 != nullptr);
    assert(node2 != nullptr);
1117
1118
    auto &node1Ref = *node1;
    auto &node2Ref = *node2;
1119
1120
1121
1122
1123
    auto &node1Bits = node1Ref.bits.get_rw();
    const auto &node2Bits = node2Ref.bits.get_ro();
    const auto n2numKeys = node2Bits.count();

    assert(node1Bits.count() + n2numKeys <= M);
1124

1125
    /// we move all keys/values from node2 to node1
1126
    auto &node1Hashs = node1Ref.fp.get_rw();
1127
1128
    auto &node1Keys = node1Ref.keys.get_rw();
    auto &node1Values = node1Ref.values.get_rw();
1129
    const auto &node2Hashs = node2Ref.fp.get_ro();
1130
1131
    const auto &node2Keys = node2Ref.keys.get_ro();
    const auto &node2Values = node2Ref.values.get_ro();
1132
    for (auto i = 0u; i < M; i++) {
1133
      if (node2Bits.test(i)) {
1134
        const auto pos = node1Bits.getFreeZero();
1135
1136
        node1Bits.set(pos);
        node1Hashs[pos] = node2Hashs[i];
1137
1138
        node1Keys[pos] = node2Keys[i];
        node1Values[pos] = node2Values[i];
1139
      }
1140
    }
1141
    node1Ref.nextLeaf = node2Ref.nextLeaf;
1142
1143
1144
1145
1146
1147
1148
1149
    if (node2Ref.nextLeaf != nullptr) {
      node2Ref.nextLeaf->prevLeaf = node1;
      PersistEmulation::writeBytes<16>();
    }
    PersistEmulation::writeBytes(
        n2numKeys * (sizeof(KeyType) + sizeof(ValueType) + 1) +  ///< moved keys, vals, hashs
        ((n2numKeys + 7) >> 3) + 16                              ///< ceiled bits + nextpointer
    );
1150
1151
1152
1153
    return node1;
  }

  /**
1154
1155
   * Merge two branch nodes my moving all keys/children from @c node to @c sibling and put the key
   * @c key from the parent node in the middle. The node @c node should be deleted by the caller.
1156
1157
1158
1159
1160
   *
   * @param sibling the left sibling node which receives all keys/children
   * @param key the key from the parent node that is between sibling and node
   * @param node the node from which we move all keys/children
   */
1161
1162
  void mergeBranchNodes(BranchNode *const sibling, const KeyType &key,
                        const BranchNode *const node) {
1163
1164
    assert(sibling != nullptr);
    assert(node != nullptr);
1165
    auto &sibRef = *sibling;
1166
    const auto &nodeRef = *node;
1167
1168
    assert(key <= nodeRef.keys[0]);
    assert(sibRef.keys[sibRef.numKeys - 1] < key);
1169

1170
1171
1172
1173
1174
    sibRef.keys[sibRef.numKeys] = key;
    sibRef.children[sibRef.numKeys + 1] = nodeRef.children[0];
    for (auto i = 0u; i < nodeRef.numKeys; i++) {
      sibRef.keys[sibRef.numKeys + i + 1] = nodeRef.keys[i];
      sibRef.children[sibRef.numKeys + i + 2] = nodeRef.children[i + 1];
1175
    }
1176
    sibRef.numKeys += nodeRef.numKeys + 1;
1177
  }
1178

1179
  /**
1180
   * Insert a leaf node into the tree for recovery.
1181
1182
1183
   *
   * @param leaf the leaf to insert
   */
1184
  void recoveryInsert(const pptr<LeafNode> &leaf) {
1185
1186
    assert(depth > 0);
    assert(leaf != nullptr);
1187

1188
    SplitInfo splitInfo;
1189
    auto hassplit = recoveryInsertInBranchNode(rootNode.branch, depth, leaf, &splitInfo);
1190

1191
    /// Check for split
1192
    if (hassplit) {
1193
      /// The root node was splitted
1194
      auto newRoot = newBranchNode();
1195
1196
1197
1198
1199
      auto &newRootRef = *newRoot;
      newRootRef.keys[0] = splitInfo.key;
      newRootRef.children[0] = splitInfo.leftChild;
      newRootRef.children[1] = splitInfo.rightChild;
      newRootRef.numKeys = 1;
1200
1201
      rootNode = newRoot;
      ++depth;
1202
1203
    }
  }
1204

1205
  /**
1206
1207
   * Insert a leaf node into the tree recursively by following the path down to the leaf level
   * starting at node @c node at depth @c depth.
1208
1209
   *
   * @param node the starting node for the insert
1210
   * @param depth the current depth of the tree (0 == leaf level)
1211
1212
1213
1214
   * @param leaf the leaf node to insert
   * @param splitInfo information about the split
   * @return true if a split was performed
   */
1215
  bool recoveryInsertInBranchNode(BranchNode * const node, const int curr_depth,
1216
                                  const pptr<LeafNode> &leaf, SplitInfo *splitInfo) {
1217
1218
1219
1220
    auto &nodeRef = *node;
    auto &leafRef = *leaf;
    auto &splitRef = *splitInfo;
    bool hassplit = false;
1221
1222
    SplitInfo childSplitInfo;

1223
    if (curr_depth == 1) {
1224
      if (nodeRef.numKeys == N) {
1225
        /* we have to insert a new right child, as the keys in the leafList are sorted */
1226
        auto newNode = newBranchNode();
1227
        newNode->children[0] = leaf;
1228
        splitRef.key =
1229
            leafRef.keys.get_ro()[findMinKeyPos(leafRef.keys.get_ro(), leafRef.bits.get_ro())];
1230
1231
        splitRef.leftChild = node;
        splitRef.rightChild = newNode;
1232
        return true;
1233
      } else {
1234
        nodeRef.keys[nodeRef.numKeys] =
1235
            leafRef.keys.get_ro()[findMinKeyPos(leafRef.keys.get_ro(), leafRef.bits.get_ro())];
1236
1237
        ++nodeRef.numKeys;
        nodeRef.children[nodeRef.numKeys] = leaf;
1238
        return false;
1239
      }
1240
    } else {
1241
1242
      hassplit = recoveryInsertInBranchNode(nodeRef.children[nodeRef.numKeys].branch,
                                            curr_depth - 1, leaf, &childSplitInfo);
1243
    }
1244
    /// Check for split
1245
    if (hassplit) {
1246
1247
      if (nodeRef.numKeys == N) {
        auto newNode = newBranchNode();
1248
        newNode->children[0] = childSplitInfo.rightChild;
1249
1250
1251
        splitRef.key = childSplitInfo.key;
        splitRef.leftChild = node;
        splitRef.rightChild = newNode;
1252
1253
        return true;
      } else {
1254
1255
        nodeRef.keys[nodeRef.numKeys] = childSplitInfo.key;
        ++nodeRef.numKeys;
1256
        nodeRef.children[nodeRef.numKeys] = childSplitInfo.rightChild;
1257
1258
1259
1260
1261
1262
1263
        return false;
      }
    }
    return false;
  }

  /**
1264
   * Print the given branch node @c node and all its children to standard output.
1265
1266
1267
1268
   *
   * @param d the current depth used for indention
   * @param node the tree node to print
   */
1269
1270
  void printBranchNode(const unsigned int d, const BranchNode * const node) const {
    const auto &nodeRef = *node;
1271
1272
    for (auto i = 0u; i < d; i++) std::cout << "  ";
    std::cout << d << " BN { ["<<node<<"] ";
1273
    for (auto k = 0u; k < nodeRef.numKeys; k++) {
1274
      if (k > 0) std::cout << ", ";
1275
      std::cout << nodeRef.keys[k];
1276
1277
    }
    std::cout << " }" << std::endl;
1278
    for (auto k = 0u; k <= nodeRef.numKeys; k++) {
1279
      if (d + 1 < depth) {
1280
        auto child = nodeRef.children[k].branch;
1281
1282
        if (child != nullptr) printBranchNode(d + 1, child);
      } else {
1283
        auto child = nodeRef.children[k].leaf;
1284
        if (child != nullptr) printLeafNode(d + 1, child);
1285
1286
1287
1288
      }
    }
  }

1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
  /**
   * Print the given leaf node @c node to standard output.
   *
   * @param d the current depth used for indention
   * @param node the tree node to print
   */
  void printLeafNode(const unsigned int d, const pptr<LeafNode> &node) const {
    const auto &nodeRef = *node;
    for (auto i = 0u; i < d; i++) std::cout << "  ";
    std::cout << "[\033[1m" << std::hex << node << std::dec << "\033[0m : ";
    for (auto i = 0u; i < M; i++) {
      if (i > 0) std::cout << ", ";
1301
      std::cout << "{(" << nodeRef.bits.get_ro()[i] << ")" << nodeRef.keys.get_ro()[i] << "}";
1302
1303
1304
1305
    }
    std::cout << "]" << std::endl;
  }

1306
  void printLeafList(){
1307
1308
1309
1310
    auto curr_node = leafList;
    while (curr_node != nullptr) {
      printLeafNode(0, curr_node);
      curr_node = curr_node->nextLeaf;
1311
1312
    }
  }
1313

1314
1315
  inline uint8_t fpHash(const KeyType &k) const {
    return (uint8_t)(k & 0xFF);
1316
  }
1317

1318
1319
1320
1321
};//end class FPTree
}//end namespace dbis::fptree

#endif
For faster browsing, not all history is shown. View entire blame