Commit 19efa9fa authored by Philipp Götze's avatar Philipp Götze
Browse files

Make tables work again for non-NVM setup too

parent 6d61f515
Pipeline #964 failed with stages
in 14 minutes and 19 seconds
......@@ -87,7 +87,7 @@ namespace pfabric {
assert(mTables[1].get() != nullptr);
SmartPtr<RecordType> tpls[2][TxSize];
auto waitTime = 1u;
restart:;
for (auto j = 0u; j < TxSize; j++) {
......@@ -106,7 +106,7 @@ namespace pfabric {
}
}
}
/* Only important for BOCC */
const auto s1 = mTables[0]->readCommit(mTxnID, mKeys, TxSize);
const auto s2 = mTables[1]->readCommit(mTxnID, mKeys, TxSize);
......
......@@ -247,12 +247,18 @@ class BOCCTable : public BaseTable,
ws.keys.emplace(e.first);
dQLock.unlockExclusive();
/// Actual insert to table
#ifdef USE_NVM_TABLES
auto pop = pmem::obj::pool_by_pptr(tbl.q);
transaction::run(pop, [&]{
for (const auto& e : writeSet.set) {
tbl.insert(std::move(e.first), std::move(e.second));
}
});
#else
for (const auto& e : writeSet.set) {
tbl.insert(std::move(e.first), std::move(e.second));
}
#endif
ws.endTS = sCtx.getNewTS(); ///< note end of transaction writing in write set
writeSet.clean();
......
......@@ -93,16 +93,19 @@ inline CuckooIterator<Iter> makeCuckooIterator(Iter j, Iter e,
template <typename RecordType, typename KeyType = DefaultKeyType>
class CuckooTable : public BaseTable {
public:
static_assert(is_tuple<RecordType>::value, "Value type must be a pfabric::Tuple");
using TupleType = typename RecordType::Base;
//< the actual implementation of the table
typedef libcuckoo::cuckoohash_map<KeyType, RecordType> TableMap;
typedef libcuckoo::cuckoohash_map<KeyType, TupleType> TableMap;
//< typedef for a updater function which returns a modification of the parameter tuple
typedef std::function<void(RecordType&)> UpdaterFunc;
typedef std::function<void(TupleType&)> UpdaterFunc;
//< typedefs for a function performing updates + deletes. Similar to UpdaterFunc
//< it allows to update the tuple, but also to delete it (indictated by the
//< setting the bool component of @c UpdateResult to false)
typedef std::function<bool(RecordType&)> UpdelFunc;
typedef std::function<bool(TupleType&)> UpdelFunc;
typedef std::function<RecordType()> InsertFunc;
......@@ -253,9 +256,9 @@ public:
* @return the number of modified tuples
*/
unsigned long updateByKey(KeyType key, UpdaterFunc ufunc) {
if (mDataTable.find_fn(key, [](RecordType r){})) {
auto res = mDataTable.update_fn(key, ufunc);
notifyObservers(res, TableParams::Update, TableParams::Immediate);
auto res = mDataTable.update_fn(key, ufunc);
if (res) {
notifyObservers(mDataTable.find(key), TableParams::Update, TableParams::Immediate);
return 1;
}
return 0;
......@@ -308,6 +311,18 @@ public:
}
}
const bool getByKey(const KeyType key, SmartPtr<RecordType> &outValue) const {
TupleType tt;
auto exists = mDataTable.find(key, tt);
if (exists) {
outValue.reset(new RecordType(tt));
return true;
}
return false;
}
/**
* @brief Return a pair of iterators for scanning the table with a
* selection predicate.
......
......@@ -290,13 +290,11 @@ class MVCCTable : public BaseTable,
}
);
writeSet.set.erase(lastIt, writeSet.set.end());
int i = 0;
for (const auto &e : writeSet.set) {
/// if entry exists
std::tuple<MVCCTuple> * tptr;
if (tbl.getAsRef(e.first, &tptr)) {
//newEntries[i] = KeyMVCCPair{e.first, ns_types::get<0>(*tbl.getByKey(e.first))};
MVCCTuple &last = std::get<0>(*tptr);;
if (updateByKey(e.first, [this, &txnID, &e](std::tuple<MVCCTuple> &tp) {
/// if entry exists
MVCCTuple &last = std::get<0>(tp);
auto iPos = getFreePos(last.usedSlots);
while (iPos > MVCCTuple::Versions - 1) {
/// If all version slots are occupied, old unused versions must be removed
......@@ -308,18 +306,20 @@ class MVCCTable : public BaseTable,
last.headers[dPos].dts = txnID;
last.headers[iPos] = {txnID, DTS_INF};
last.values[iPos] = std::move(e.second);
#ifdef USE_NVM_TABLES
pmem_drain(); ///< sfence
#endif
last.usedSlots |= (1LL << iPos);
}
})) {
/// already done in condition
} else {
/// Entry does not exist yet
else {
auto mvcc = MVCCObject<TupleType>();
mvcc.headers[0] = {txnID, DTS_INF};
mvcc.values[0] = std::move(e.second);
mvcc.usedSlots = 1;
tbl.insert(std::move(e.first), std::move(mvcc));
}
++i;
}
/* Lock Exclusively for overwriting */
......
......@@ -141,13 +141,13 @@ class PBPTreeTable : public BaseTable {
};
/** typedef for a updater function which returns a modification of the parameter tuple */
using UpdaterFunc = std::function<void(RecordType &)>;
using UpdaterFunc = std::function<void(TupleType &)>;
/** typedefs for a function performing updates + deletes. Similar to UpdaterFunc
* it allows to update the tuple, but also to delete it (indictated by the
* setting the bool component of \c UpdateResult to false)
**/
using UpdelFunc = std::function<bool(RecordType &)>;
using UpdelFunc = std::function<bool(TupleType &)>;
using InsertFunc = std::function<RecordType()>;
......@@ -255,12 +255,15 @@ class PBPTreeTable : public BaseTable {
* The actual modification is done by the updater function specified as parameter.
*
* \param key the key of the tuple to be modified
* \param func a function performing the modification by returning a modified
* tuple
* \param func a function performing the modification by updating the value in-place
* \return the number of modified tuples
*****************************************************************************/
unsigned long updateByKey(KeyType key, UpdaterFunc ufunc) {
//TODO:
TupleType * val;
if (getAsRef(key, &val)) {
ufunc(*val);
return 1;
}
return 0;
}
......
......@@ -254,10 +254,14 @@ class S2PLTable : public BaseTable,
locks.lockExclusive(key);
wKeysLocked.push_back(key);
// insert
#if USE_NVM_TABLES
auto pop = pmem::obj::pool_by_pptr(tbl.q);
transaction::run(pop, [&] {
tbl.insert(key, rec);
});
#else
tbl.insert(key, rec);
#endif
}
/**
......
......@@ -28,7 +28,9 @@
#include <unordered_map>
#include "core/PFabricTypes.hpp"
#ifdef USE_NVM_TABLES
#include <libpmem.h>
#endif
namespace pfabric {
......@@ -207,9 +209,13 @@ class StateContext {
/** Set last committed transaction ID (snapshot version) */
void setLastCTS(const GroupID topoID, const TransactionID txnID) {
#ifdef USE_NVM_TABLES
pmem_drain();
topoGrps[topoID].second.store(txnID, std::memory_order_relaxed);
pmem_persist(&topoGrps[topoID].second, sizeof(TransactionID));
#else
topoGrps[topoID].second.store(txnID, std::memory_order_relaxed);
#endif
}
/** Get oldest currently visible version; used for garbage collection */
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment