Commit 0f546ac4 authored by Philipp Götze's avatar Philipp Götze
Browse files

Make MVCC table to use atomics and manual flushes

parent 00c7c686
......@@ -54,33 +54,89 @@ namespace pfabric {
* @brief MVCC wrapper for a given RecordType
******************************************************************************/
template <typename RecordType, std::size_t VERSIONS = 2>
class MVCCObject {
public:
std::uint64_t usedSlots;
struct MVCCObject {
static constexpr std::size_t Versions = VERSIONS;
struct header {
TransactionID cts;
TransactionID dts;
std::atomic<TransactionID> cts{DTS_INF};
std::atomic<TransactionID> dts{0};
header() noexcept : cts{DTS_INF}, dts{0} {}
header(const header& rhs) noexcept : cts{rhs.cts.load()}, dts{rhs.dts.load()} {}
header(header&& rhs) noexcept : cts{rhs.cts.load()}, dts{rhs.dts.load()} {}
header& operator=(const header& rhs) {
cts.store(rhs.cts.load());
dts.store(rhs.dts.load());
return *this;
}
};
static const std::size_t Versions = VERSIONS;
std::array<header, VERSIONS> headers;
std::atomic_uint8_t usedSlots{0};
std::array<RecordType, VERSIONS> values;
// std::mutex mtx;
/** Default Constructor */
MVCCObject() noexcept : headers{}, usedSlots{0}, values{} {}
/** Copy Constructor */
MVCCObject(const MVCCObject& rhs) noexcept : headers{rhs.headers},
usedSlots{rhs.usedSlots.load()},
values{rhs.values} {}
/** Move Constructor */
MVCCObject(MVCCObject&& rhs) noexcept : headers{std::move(rhs.headers)},
usedSlots{rhs.usedSlots.load()},
values{std::move(rhs.values)} {}
/** Copy Assignment Operator */
MVCCObject& operator=(const MVCCObject& rhs) {
usedSlots.store(rhs.usedSlots.load());
headers = rhs.headers;
values = rhs.values;
return *this;
}
int getCurrent(const TransactionID txnID) const {
inline int getCurrent(const TransactionID& txnID) const {
// const auto slots = usedSlots.load();
for (auto i = 0u; i < VERSIONS; ++i) {
if (usedSlots & (1 << i) &&
headers[i].cts <= txnID && headers[i].dts > txnID)
if (usedSlots.load() & (1 << i)) {
// const std::lock_guard<std::mutex> lock(mtx); //TODO: would need a NUMA suitable lock (PMDK?)
if (headers[i].dts.load() > txnID &&
headers[i].cts.load() <= txnID)
return i;
}
}
return -1;
}
void cleanUpVersions(const TransactionID oldestReadVersion) {
for(auto i = 0u; i < VERSIONS; ++i) {
if (headers[i].dts <= oldestReadVersion)
usedSlots &= ~(1 << i);
}
inline void newEntry(const TransactionID& txnID, const TransactionID& iPos, const TransactionID& dPos,
const RecordType &value) {
values[iPos] = std::move(value);
headers[iPos].cts.store(txnID);
headers[iPos].dts.store(DTS_INF);
headers[dPos].dts.store(txnID);
#ifdef USE_NVM_TABLES
/// Flush is necessary for failure-atomicity;
// pmem_flush(&headers, sizeof(headers));
// pmem_flush(&values[iPos], sizeof(RecordType));
pmem_flush(this, sizeof(MVCCObject<RecordType>));
pmem_drain();
#endif
usedSlots.store(usedSlots.load() | (1LL << iPos));
#ifdef USE_NVM_TABLES
pmem_flush(&usedSlots, sizeof(usedSlots));
// pmem_drain();
#endif
}
inline void cleanUpVersions(const TransactionID& oldestReadVersion) {
/// TODO: needs persistent guarantees?
for(auto i = 0u; i < VERSIONS; ++i) {
if (headers[i].dts.load() <= oldestReadVersion) {
usedSlots.store(usedSlots.load() & ~(1 << i));
headers[i].cts.store(DTS_INF);
headers[i].dts.store(0);
}
}
}
};
......@@ -239,6 +295,10 @@ class MVCCTable : public BaseTable,
tblID = sCtx.registerState(this->shared_from_this());
}
TableID getID() const {
return tblID;
}
void transactionBegin(const TransactionID& txnID) {
sCtx.txCntW++;
writeSet.txnID = txnID;
......@@ -257,7 +317,7 @@ class MVCCTable : public BaseTable,
s = this->transactionCommit(txnID);
if (s != Errc::SUCCESS) return s;
s = sCtx.regStates[otherID]->transactionCommit(txnID);
sCtx.setLastCTS(0, txnID); //< this has actually to be persistent
sCtx.setLastCTS(0, txnID);
sCtx.removeTx(txnID);
}
......@@ -271,16 +331,11 @@ class MVCCTable : public BaseTable,
Errc transactionCommit(const TransactionID& txnID) {
using MVCCTuple = MVCCObject<TupleType>;
//struct KeyMVCCPair {
// KeyType key;
// MVCCTuple* mvcc;
//};
const auto numEntries = writeSet.set.size();
//auto *newEntries = new KeyMVCCPair[numEntries];
/* Buffer new MVCC entries */
/// remove duplicates
/// Remove duplicates
std::sort(writeSet.set.begin(), writeSet.set.end());
auto lastIt = std::unique(writeSet.set.begin(), writeSet.set.end(),
[](const typename WriteSetType::Pair &a, const typename WriteSetType::Pair &b) -> bool {
......@@ -289,49 +344,92 @@ class MVCCTable : public BaseTable,
);
writeSet.set.erase(lastIt, writeSet.set.end());
/// Update/Insert all entries from writeSet in underlying table
/// In-place variant ======================================================
///*
for (const auto &e : writeSet.set) {
if (updateByKey(e.first, [this, &txnID, &e](std::tuple<MVCCTuple> &tp) {
/// if entry exists
MVCCTuple &last = std::get<0>(tp);
/// If entry exists
const auto success = updateByKey(e.first, [this, &txnID, &e](std::tuple<MVCCTuple> &tp) {
MVCCTuple &mvcc = std::get<0>(tp);
const auto dPos = mvcc.getCurrent(txnID);
auto iPos = getFreePos(mvcc.usedSlots.load());
while (iPos > MVCCTuple::Versions - 1) {
/// If all version slots are occupied, old unused versions must be removed; this is the
/// only necessary possible waiting point
mvcc.cleanUpVersions(sCtx.recalcOldestVisible(txnID));
iPos = getFreePos(mvcc.usedSlots.load());
}
/// No need for synchronization; only possible problem is if readers access at the same time
/// they could end up with no fitting version and would restart (but no consistency or
/// failure-atomicity issue here)
mvcc.newEntry(txnID, iPos, dPos, e.second);
});
/// Entry does not exist yet
if (!success) {
MVCCObject<TupleType> mvcc{};
mvcc.headers[0].cts.store(txnID);
mvcc.headers[0].dts.store(DTS_INF);
mvcc.values[0] = std::move(e.second);
mvcc.usedSlots.store(1);
/// Doesn't need synchronization in our use case as inserts only happen during preparation
tbl.insert(std::move(e.first), std::move(mvcc));
}
}//*/
/// Out-of-place variant ===================================================
/*struct KeyMVCCPair {
KeyType key;
MVCCObject<TupleType> mvcc;
};
std::vector<KeyMVCCPair> newEntries(numEntries);
/// Buffer new MVCC entries
int i = 0;
for(const auto &e : writeSet.set) {
newEntries[i].key = e.first;
/// if entry exists
SmartPtr<pfabric::Tuple<MVCCTuple>> tplPtr;
if (tbl.getByKey(e.first, tplPtr)) {
newEntries[i].mvcc = ns_types::get<0>(*tplPtr);
auto &last = newEntries[i].mvcc;
const auto dPos = last.getCurrent(txnID);
auto iPos = getFreePos(last.usedSlots);
while (iPos > MVCCTuple::Versions - 1) {
/// If all version slots are occupied, old unused versions must be removed
last.cleanUpVersions(sCtx.getOldestVisible());
last.cleanUpVersions(sCtx.recalcOldestVisible(txnID));
iPos = getFreePos(last.usedSlots);
}
const auto dPos = last.getCurrent(txnID);
last.headers[dPos].dts = txnID;
last.headers[iPos] = {txnID, DTS_INF};
last.values[iPos] = std::move(e.second);
#ifdef USE_NVM_TABLES
pmem_drain(); ///< sfence
#endif
last.headers[iPos].cts = txnID;
last.headers[iPos].dts = DTS_INF;
last.values[iPos] = e.second;
last.usedSlots |= (1LL << iPos);
})) {
/// already done in condition
} else {
/// Entry does not exist yet
auto mvcc = MVCCObject<TupleType>();
mvcc.headers[0] = {txnID, DTS_INF};
mvcc.values[0] = std::move(e.second);
mvcc.usedSlots = 1;
tbl.insert(std::move(e.first), std::move(mvcc));
newEntries[i].mvcc = MVCCTuple();
auto &last = newEntries[i].mvcc;
last.headers[0].cts.store(txnID);
last.headers[0].dts.store(DTS_INF);
last.values[0] = std::move(e.second);
last.usedSlots.store(1);
}
++i;
}
/* Lock Exclusively for overwriting */
/* Write new Entries */
/* Unlock all*/
//for (auto e = 0u; e < numEntries; ++e) {
// locks.lockExclusive(newEntries[e].key);
// tbl.insert(std::move(newEntries[e].key), std::move(newEntries[e].mvcc));
// locks.unlockExclusive(newEntries[e].key);
//}
/// Lock Exclusively for overwriting
/// Write new Entries
/// Unlock all
auto pop = pool_by_pptr(tbl.btree);
transaction::run(pop, [&] {
for(const auto &e : newEntries) {
locks.lockExclusive(e.key);
tbl.insert(std::move(e.key), std::move(e.mvcc));
locks.unlockExclusive(e.key);
}
});*/
/* Clean up */
writeSet.clean();
//delete [] newEntries;
return Errc::SUCCESS;
}
......@@ -340,12 +438,12 @@ class MVCCTable : public BaseTable,
}
Errc readCommit(const TransactionID txnID, KeyType* keys, size_t until) {
//nothing to do here
/// nothing to do here
return Errc::SUCCESS;
}
void cleanUpReads(KeyType* keys, size_t until) {
//nothing to do here
/// nothing to do here
}
/*==========================================================================*
......@@ -478,28 +576,37 @@ class MVCCTable : public BaseTable,
}
}
/* Get MVCC Object */
// locks.lockShared(key);
SmartPtr<pfabric::Tuple<MVCCObject<TupleType>>> tplPtr;
if (!tbl.getByKey(key, tplPtr)) {
// locks.unlockShared(key);
/// in-place variant
///*
std::tuple<MVCCObject<TupleType>> * tplPtr = nullptr;
if (!tbl.getAsRef(key, &tplPtr)) {
return Errc::NOT_FOUND;
}
auto& mvcc = ns_types::get<0>(*tplPtr);//*/
/// out-of-place variant
/*
locks.lockShared(key);
std::tuple<MVCCObject<TupleType>> * tplPtr = nullptr;
if (!tbl.getAsRef(key, &tplPtr)) {
locks.unlockShared(key);
return Errc::NOT_FOUND;
}
const auto& mvcc = ns_types::get<0>(*tplPtr);
// locks.unlockShared(key);
const auto &mvcc = ns_types::get<0>(*tplPtr);
locks.unlockShared(key);*/
/* Get read CTS (version that was read first) for consistency */
auto& readCTS = sCtx.getReadCTS(txnID, 0);
if(readCTS == 0) {
/* first read operation by this txnID --> save snapshot version */
readCTS = sCtx.getLastCTS(0);
//sCtx.setReadCTS(txnID, 0, readCTS);
}
/* Read visible tuple,
* uses readCTS instead of txnID to avoid reading of delayed writes */
const auto pos = mvcc.getCurrent(readCTS);
if (pos == -1)
if (pos == -1) {
return Errc::NOT_FOUND;
}
/* Set out and return value */
outValue.reset(new RecordType(mvcc.values[pos]));
......@@ -557,7 +664,7 @@ class MVCCTable : public BaseTable,
/*==========================================================================*
* Members *
*==========================================================================*/
// RWLocks<KeyType> locks;
//RWLocks<KeyType> locks;
WriteSetType writeSet;
Table tbl;
TableID tblID;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment