Commit be97a7a6 authored by Philipp Götze's avatar Philipp Götze
Browse files

Generalized Tx-Processing for arbitrary number of states

parent 0f62a3af
......@@ -55,7 +55,7 @@ typedef std::size_t TupleLimit;
typedef unsigned int SlideLength;
/// unique identifier of a transaction
using TransactionID = unsigned long;
using TransactionID = unsigned long long;
/// vector of strings
typedef std::vector< std::string > StringTuple;
......
......@@ -61,7 +61,7 @@ namespace pfabric {
* @param tbl the table that is read
* @param pred an optional filter predicate
*/
FromTxTables(SCtxType& sCtx): mTables{sCtx.regStates[0], sCtx.regStates[1]}, mSCtx{sCtx} {}
FromTxTables(SCtxType& sCtx): mTables{sCtx.regStates}, mSCtx{sCtx} {}
/**
* Deallocates all resources.
......@@ -74,69 +74,99 @@ namespace pfabric {
KeyType mKeys[TxSize];
if (mSCtx.usingZipf) {
for(auto i = 0u; i < TxSize; i++)
for (auto i = 0u; i < TxSize; i++)
mKeys[i] = mSCtx.zipfGen->nextValue();
} else {
for(auto i = 0u; i < TxSize; i++)
for (auto i = 0u; i < TxSize; i++)
mKeys[i] = mSCtx.dis->operator()(mSCtx.rndGen);
}
assert(mTables[0].get() != nullptr);
assert(mTables[1].get() != nullptr);
for (auto s = 0u; s < MAX_STATES_TOPO; ++s) {
assert(mTables[s].get() != nullptr);
}
SmartPtr<RecordType> tpls[2][TxSize];
SmartPtr<RecordType> tpls[MAX_STATES_TOPO][TxSize];
auto waitTime = 1u;
restart:;
for (auto j = 0u; j < TxSize; j++) {
for (auto i = 0u; i < 2; i++) {
for (auto j = 0u; j < TxSize; ++j) {
for (auto i = 0u; i < MAX_STATES_TOPO; ++i) {
if (mTables[i]->getByKey(mTxnID, mKeys[j], tpls[i][j]) != Errc::SUCCESS) {
/* restart, caused by inconsistency or other erros */
// std::cout << "Key: " << mKeys[j] << std::endl;
// std::cout << "Key: " << mKeys[j] << std::endl;
mSCtx.restarts++;
mTables[0]->cleanUpReads(mKeys, i?j+1:j);
mTables[1]->cleanUpReads(mKeys, j);
for (auto s = 0u; s < MAX_STATES_TOPO; ++s) {
mTables[s]->cleanUpReads(mKeys, (i > s)? j+1 : j);
}
mSCtx.setReadCTS(mTxnID, 0, 0);
boost::this_thread::sleep_for(boost::chrono::nanoseconds(50*TxSize*waitTime));
// waitTime *= 2;
// boost::this_thread::interruption_point();
boost::this_thread::sleep_for(boost::chrono::nanoseconds(500*TxSize*waitTime));
// waitTime *= 2;
//boost::this_thread::interruption_point();
goto restart;
}
}
}
/* Only important for BOCC */
const auto s1 = mTables[0]->readCommit(mTxnID, mKeys, TxSize);
const auto s2 = mTables[1]->readCommit(mTxnID, mKeys, TxSize);
if(s1 != Errc::SUCCESS || s2 != Errc::SUCCESS) {
std::array<Errc, MAX_STATES_TOPO> status;
for (auto s = 0u; s < MAX_STATES_TOPO; ++s) {
status[s] = mTables[s]->readCommit(mTxnID, mKeys, TxSize);
}
if (std::any_of(status.begin(), status.end(), [](Errc e){ return e != Errc::SUCCESS;})) {
mSCtx.restarts++;
mSCtx.removeTx(mTxnID);
mTxnID = mSCtx.newTx();
goto restart;
}
/* check if same for correctness criteria */
// for (auto j = 0u; j < TxSize; j++) {
// if (std::get<2>(*tpls[0][j]) != std::get<2>(*tpls[1][j]))
// std::cout << "ERROR: INCONSISTENT READ\n";
// }
/* check if same for correctness criteria *//*
using KeyType = typename std::tuple_element<1, typename RecordType::Base>::type;
using ElementType = typename std::tuple_element<2, typename RecordType::Base>::type;
for (auto s = 0u; s < MAX_STATES_TOPO; ++s) {
std::array<SmartPtr<RecordType>, TxSize> tpls_sep;
std::copy_n(std::begin(tpls[s]), TxSize, std::begin(tpls_sep));
std::sort(tpls_sep.begin(), tpls_sep.end(), [](SmartPtr<RecordType> a, SmartPtr<RecordType> b) {
return std::get<1>(*a) > std::get<1>(*b);
});
KeyType prevKey = 0;
ElementType prevValue = 0;
for (auto o = 0u; o < TxSize; ++o) {
if (std::get<1>(*tpls_sep[o]) == prevKey) {
if (std::get<2>(*tpls_sep[o]) != prevValue) {
std::cout << "ERROR: INCONSISTENT LOCAL READ\n";
}
} else {
prevKey = std::get<1>(*tpls_sep[o]);
prevValue = std::get<2>(*tpls_sep[o]);
}
}
}
for (auto o = 0u; o < TxSize; ++o) {
std::array<ElementType, MAX_STATES_TOPO> elements;
for (auto s = 0u; s < MAX_STATES_TOPO; ++s) {
elements[s] = std::get<2>(*tpls[s][o]);
}
if (std::any_of(elements.begin()+1, elements.end(), [&](ElementType el) {return el != elements[0];}))
std::cout << "ERROR: INCONSISTENT ACROSS STATE READ\n";
}*/
/* when everything consistent, publish the tuples */
for (auto i = 0u; i < 2; i++) {
for (auto j = 0u; j < TxSize; j++) {
this->getOutputDataChannel().publish(tpls[i][j], false);
for (auto s = 0u; s < MAX_STATES_TOPO; ++s) {
for (auto o = 0u; o < TxSize; ++o) {
this->getOutputDataChannel().publish(tpls[s][o], false);
}
}
this->getOutputPunctuationChannel().publish(PunctuationPtr(new Punctuation(Punctuation::EndOfStream)));
mTables[0]->cleanUpReads(mKeys, TxSize);
mTables[1]->cleanUpReads(mKeys, TxSize);
for (auto s = 0u; s < MAX_STATES_TOPO; ++s) {
mTables[s]->cleanUpReads(mKeys, TxSize);
}
mSCtx.removeTx(mTxnID);
return 2*TxSize;
return MAX_STATES_TOPO*TxSize;
}
private:
const TablePtr mTables[2]; //< the table from which the tuples are fetched
const std::array<TablePtr, MAX_STATES_TOPO> mTables; //< the table from which the tuples are fetched
SCtxType& mSCtx;
};
......
......@@ -55,7 +55,7 @@ namespace pfabric {
/** the function for deriving the TransactionID of an incoming stream element */
using TxIDFunc = std::function<TransactionID(const StreamElement&)>;
/**
* Create a new ToTable operator to store incoming tuples in the
* given table.
......@@ -90,13 +90,18 @@ namespace pfabric {
* the incoming punctuation tuple
*/
void processPunctuation(const PunctuationPtr& punctuation) {
if (punctuation->ptype() == Punctuation::TxCommit)
mTable->transactionPreCommit(boost::any_cast<TransactionID>(punctuation->data()));
else if (punctuation->ptype() == Punctuation::TxAbort)
mTable->transactionAbort(boost::any_cast<TransactionID>(punctuation->data()));
else if (punctuation->ptype() == Punctuation::TxBegin)
mTable->transactionBegin(boost::any_cast<TransactionID>(punctuation->data()));
switch (punctuation->ptype()) {
case Punctuation::TxBegin:
mTable->transactionBegin(boost::any_cast<TransactionID>(punctuation->data()));
break;
case Punctuation::TxCommit:
mTable->transactionPreCommit(boost::any_cast<TransactionID>(punctuation->data()));
break;
case Punctuation::TxAbort:
mTable->transactionAbort(boost::any_cast<TransactionID>(punctuation->data()));
break;
default: break;
}
this->getOutputPunctuationChannel().publish(punctuation);
}
......
......@@ -52,7 +52,7 @@ uint8_t getSetFreePos(std::atomic<std::uint64_t> &v) {
do {
pos = getFreePos(expected); //TODO: catch if no free position
} while(!v.compare_exchange_weak(
expected,
expected,
expected | (1ULL << pos), //< set bit at pos
std::memory_order_relaxed));
return pos;
......@@ -76,40 +76,4 @@ unsigned int hashMe(unsigned int x) {
return x;
}
ZipfianGenerator::ZipfianGenerator(unsigned int min, unsigned int max, double zipfianconstant = ZIPFIAN_CONSTANT)
: items{max - min + 1}, base{min}, zipfianconstant{zipfianconstant}, theta{zipfianconstant} {
for(auto i = 0Lu; i < items; i++)
zetan += 1 / (std::pow(i + 1, theta));
for(auto i = 0Lu; i < 2; i++)
zeta2theta += 1 / (std::pow(i + 1, theta));
alpha = 1.0 / (1.0 - theta);
eta = (1 - std::pow(2.0 / items, 1 - theta)) / (1 - zeta2theta / zetan);
nextValue();
}
//unsigned int ZipfianGenerator::nextValue() { return nextInt(items); }
/* Scrambled version */
unsigned int ZipfianGenerator::nextValue() {
auto ret = nextInt(items);
return base + hashMe(ret) % items;
}
unsigned int ZipfianGenerator::nextInt(unsigned int itemcount) {
double u = dist(gen);
double uz = u * zetan;
if (uz < 1.0) { return base;}
if (uz < 1.0 + std::pow(0.5, theta)) { return base + 1; }
unsigned int ret = base + (int) ((itemcount) * std::pow(eta * u - eta + 1, alpha));
return ret;
}
} /* end namespace pfabric */
......@@ -27,13 +27,28 @@
#include "core/PFabricTypes.hpp"
#ifdef USE_NVM_TABLES
#include "pfabric_config.h"
#include <libpmem.h>
#include <libpmemobj++/pool.hpp>
#include <libpmemobj++/transaction.hpp>
#include <libpmemobj++/make_persistent.hpp>
#include <libpmemobj++/persistent_ptr.hpp>
#endif
namespace pfabric {
using pmem::obj::delete_persistent;
using pmem::obj::make_persistent;
using pmem::obj::persistent_ptr;
using pmem::obj::pool;
using pmem::obj::transaction;
using TableID = unsigned short;
/// Settings, TODO: maybe these should rather be template arguments
constexpr auto MAX_TOPO_GRPS = 1; ///< number of allowed topology groups
constexpr auto MAX_STATES = 2; ///< number of globally allowed states
constexpr auto MAX_STATES_TOPO = 2; ///< number of allowed states per topology group
/** Infinity, used for maximum validity */
constexpr auto DTS_INF = std::numeric_limits<TransactionID>::max();
/** Possible isolation levels */
......@@ -52,20 +67,43 @@ unsigned int hashMe(unsigned int x);
/** Derived from YCSB.
* see: https://github.com/brianfrankcooper/YCSB/blob/master/core/src/main/java/com/yahoo/ycsb/generator/ZipfianGenerator.java
*/
template<typename T>
class ZipfianGenerator {
public:
static constexpr double ZIPFIAN_CONSTANT = 0.99;
ZipfianGenerator(unsigned int min, unsigned int max, double zipfianconstant);
unsigned int nextValue();
ZipfianGenerator(T min, T max, double zipfianconstant = ZIPFIAN_CONSTANT)
: items{max - min + 1}, base{min}, zipfianconstant{zipfianconstant},
theta{zipfianconstant} {
for(auto i = 0Lu; i < items; i++)
zetan += 1 / (std::pow(i + 1, theta));
for(auto i = 0Lu; i < 2; i++)
zeta2theta += 1 / (std::pow(i + 1, theta));
alpha = 1.0 / (1.0 - theta);
eta = (1 - std::pow(2.0 / items, 1 - theta)) / (1 - zeta2theta / zetan);
nextValue();
}
/* Scrambled version */
T nextValue() {
auto ret = nextInt(items);
return base + 1 + hashMe(ret) % (items-1); ///TODO: Key 0 bugs and is excluded for now
}
private:
unsigned int nextInt(unsigned int itemcount);
T nextInt(size_t itemcount) {
double u = dist(gen);
double uz = u * zetan;
if (uz < 1.0) { return base;}
if (uz < 1.0 + std::pow(0.5, theta)) { return base + 1; }
return base + (int) ((itemcount) * std::pow(eta * u - eta + 1, alpha));
}
/** Number of items. */
const unsigned int items;
const size_t items;
/** Min item to generate. */
const unsigned int base;
const T base;
/** The zipfian constant to use. */
const double zipfianconstant;
......@@ -89,19 +127,30 @@ class StateContext {
using LastCTS = TransactionID;
using GroupID = unsigned short;
using TablePtr = std::shared_ptr<TableType>;
using TopoGrp = std::pair<std::array<TablePtr,2>, std::atomic<LastCTS>>;
using WriteInfo = std::array<Status,2>; //std::tuple<TableID, Status>;
using ReadInfo = std::array<ReadCTS,1>; //std::tuple<TopologyID, ReadCTS>;
using TopoGrp = std::pair<std::array<TableID, MAX_STATES_TOPO>, std::atomic<LastCTS>>;
using WriteInfo = std::array<Status, MAX_STATES_TOPO>; //std::tuple<TableID, Status>;
using ReadInfo = std::array<ReadCTS, MAX_TOPO_GRPS>; //std::tuple<TopologyID, ReadCTS>;
using ActiveTx = std::tuple<TransactionID, WriteInfo, ReadInfo>;
public:
/** Atomic counter for assigning global transaction IDs */
std::atomic<TransactionID> nextTxID{1};
std::atomic<TransactionID> nextTxID{
std::chrono::duration_cast<std::chrono::duration<long long unsigned int, std::nano>>(
std::chrono::high_resolution_clock::now().time_since_epoch()).count()
};
/** Registered States and Topology Groups (currently hard-coded, not thread-safe) */
TablePtr regStates[2];
TopoGrp topoGrps[1];
#ifdef USE_NVM_TABLES
struct sCtxRoot {
persistent_ptr<std::array<TopoGrp, MAX_TOPO_GRPS>> topoGrps;
GroupID numGrps;
};
pool<sCtxRoot> pop;
#endif
std::array<TopoGrp, MAX_TOPO_GRPS> * topoGrps;
std::array<TablePtr, MAX_STATES> regStates;
/** Mapping from internal transaction ID to gloabl transaction ID */
std::unordered_map<TransactionID, TransactionID> tToTX;
std::mutex mtx{};
/*---- Only for evaluation -------------------------------------------------*/
/** Counting necessary restarts of txs */
......@@ -112,21 +161,48 @@ class StateContext {
std::mt19937 rndGen{std::random_device{}()};
/** Distribution settings */
bool usingZipf = false;
std::unique_ptr<ZipfianGenerator> zipfGen;
std::unique_ptr<ZipfianGenerator<KeyType>> zipfGen;
std::unique_ptr<std::uniform_int_distribution<KeyType>> dis;
void setDistribution(bool zipf, KeyType min, KeyType max, double zipfConst = 0.0) {
usingZipf = zipf;
dis.reset(new std::uniform_int_distribution<KeyType>{min, max});
if(zipf) {
zipfGen.reset(new ZipfianGenerator{min, max, zipfConst});
zipfGen.reset(new ZipfianGenerator<KeyType>{min, max, zipfConst});
}
}
/*--------------------------------------------------------------------------*/
StateContext() {
#ifdef USE_NVM_TABLES
const std::string path = pfabric::gPmemPath + "StateContext"; ///TODO: needs a unique ID
if (access(path.c_str(), F_OK) != 0) {
pop = pool<sCtxRoot>::create(path, "StateContext");
transaction::run(pop, [&] {
pop.root()->topoGrps = make_persistent<std::array<TopoGrp, MAX_TOPO_GRPS>>();
pop.root()->numGrps = 0;
});
} else {
pop = pool<sCtxRoot>::open(path, "StateContext");
}
topoGrps = pop.root()->topoGrps.get();
numGroups = &pop.root()->numGrps;
#else
topoGrps = new std::array<TopoGrps, MAX_TOPO_GRPS>;
numGroups = new GroupID{0u};
#endif
}
~StateContext() {
#ifdef USE_NVM_TABLES
pop.close();
#else
delete topoGrps;
#endif
}
/** Get status of a writing transaction; either active, commit or abort */
const Status &getWriteStatus(const TransactionID txnID,
const TableID tblID) const {
const Status &getWriteStatus(const TransactionID txnID, const TableID tblID) const {
return std::get<1>(activeTxs[getPosFromTxnID(txnID)])[tblID];
}
......@@ -135,8 +211,7 @@ class StateContext {
}
/** Get status of a reading transaction; returns read snapshot version */
const ReadCTS getReadCTS(const TransactionID txnID,
const GroupID topoID) const {
ReadCTS getReadCTS(const TransactionID txnID, const GroupID topoID) const {
return std::get<2>(activeTxs[getPosFromTxnID(txnID)])[topoID];
}
......@@ -149,99 +224,117 @@ class StateContext {
std::get<2>(activeTxs[getPosFromTxnID(txnID)])[topoID] = read;
}
const TransactionID getOldestActiveTx() const {
TransactionID getOldestActiveTx() const {
auto oldest = DTS_INF;
const auto slots = usedSlots.load(std::memory_order_relaxed);
for(int pos = 0; pos < 64; ++pos) {
const auto bTS = std::get<0>(activeTxs[pos]);
if((slots & (1ULL << pos)) && bTS < oldest)
oldest = bTS;
const auto &slots = usedSlots.load(std::memory_order_relaxed);
for(auto pos = 0u; pos < 64; ++pos) {
if (slots & (1ULL << pos)) {
const auto bTS = std::get<0>(activeTxs[pos]);
oldest = std::min(bTS, oldest);
}
}
return oldest;
}
/** Registers a new transaction to the context */
const TransactionID newTx() {
TransactionID newTx() {
const auto txnID = nextTxID.fetch_add(1);
const auto pos = getSetFreePos(usedSlots);
activeTxs[pos] = std::make_tuple(txnID,
WriteInfo{{Status::Active, Status::Active}}, //< TableID | Status
ReadInfo{{0}}); //< GroupID | LastCommitID
activeTxs[pos] = std::make_tuple(txnID, WriteInfo{}, ReadInfo{});
std::get<1>(activeTxs[pos]).fill(Status::Active); ///< TableID | Status
std::get<2>(activeTxs[pos]).fill(0); ///< GroupID | LastCommitID
return txnID;
}
const TransactionID getNewTS() {
TransactionID getNewTS() {
return nextTxID.fetch_add(1);
}
/** Removes a transaction from the context; possibly has to recalculate the
* oldest visible version*/
/** Removes a transaction from the context */
void removeTx(const TransactionID txnID) {
const auto readCTS = getReadCTS(txnID, 0);
setReadCTS(txnID, 0, 0);
for (auto topo = 0u; topo < MAX_TOPO_GRPS; ++topo)
setReadCTS(txnID, topo, 0); ///< reset ReadCTSs for next transaction
unsetPos(usedSlots, getPosFromTxnID(txnID)); //< release slot
}
/** Recalculate the oldest visible version */
TransactionID recalcOldestVisible(const TransactionID txnID) {
TransactionID min = oldestVisibleVersion.load(std::memory_order_relaxed);
auto newMin = DTS_INF;
/* find new minimum */
if(min != 0) {
auto newMin = DTS_INF;
const auto slots = usedSlots.load(std::memory_order_relaxed);
for(int pos = 0; pos < 64; ++pos) {
const auto rCTS = std::get<2>(activeTxs[pos])[0];
if((slots & (1ULL << pos)) && rCTS != 0 && rCTS < newMin)
newMin = rCTS;
for(auto pos = 0u; pos < 64; ++pos) {
if (slots & (1ULL << pos)) {
const auto rCTS = std::get<2>(activeTxs[pos])[0];
if (rCTS != 0 && rCTS < newMin)
newMin = rCTS;
}
}
/* no other active Tx, use last Snapshot */
if (newMin == DTS_INF) newMin = getLastCTS(0);
while(min < newMin && !oldestVisibleVersion.compare_exchange_weak(min, newMin, std::memory_order_relaxed));
} else if(min == 0) {
const auto newMin = getLastCTS(0);
newMin = getLastCTS(0);
while(!oldestVisibleVersion.compare_exchange_weak(min, newMin, std::memory_order_relaxed));
}
return newMin;
}
/** Get last committed transaction ID (snapshot version) */
const TransactionID getLastCTS(const GroupID topoID) {
return topoGrps[topoID].second.load(std::memory_order_relaxed);
TransactionID getLastCTS(const GroupID topoID) {
return (*topoGrps)[topoID].second.load(std::memory_order_relaxed);
}
/** Set last committed transaction ID (snapshot version) */
void setLastCTS(const GroupID topoID, const TransactionID txnID) {
#ifdef USE_NVM_TABLES
pmem_drain();
topoGrps[topoID].second.store(txnID, std::memory_order_relaxed);
pmem_persist(&topoGrps[topoID].second, sizeof(TransactionID));
(*topoGrps)[topoID].second.store(txnID, std::memory_order_relaxed);
pmem_persist(&(*topoGrps)[topoID].second, sizeof(TransactionID));
#else
topoGrps[topoID].second.store(txnID, std::memory_order_relaxed);
(*topoGrps)[topoID].second.store(txnID, std::memory_order_relaxed);
#endif
}
/** Get oldest currently visible version; used for garbage collection */
const TransactionID getOldestVisible() const{
TransactionID getOldestVisible() const {
return oldestVisibleVersion.load(std::memory_order_relaxed);
}
/** Register a new state/table to the context */
const TableID registerState(const TablePtr tbl) {
TableID registerState(const TablePtr tbl) {
regStates[numStates] = tbl;
return numStates++;
}
/** Register a new topology/continuous query to the context */
const GroupID registerTopo(const std::array<TablePtr,2> &tbls) {
topoGrps[numGroups] = std::make_pair(tbls, 0);
return numGroups++;
GroupID registerTopo(const std::array<TableID, MAX_STATES_TOPO> &tbls) {
auto &numGrps = *numGroups;
(*topoGrps)[numGrps] = std::make_pair(tbls, 0);
pmem_flush(&(*topoGrps)[numGrps], sizeof(TableID) * MAX_STATES_TOPO + sizeof(LastCTS));
++numGrps;
pmem_persist(numGroups, sizeof(GroupID));
return numGrps-1;
}
/** Update the table ID of an existing topology group */
void updateTopo(const GroupID topoID, const std::array<TableID, MAX_STATES_TOPO> &tbls) {
(*topoGrps)[topoID].first = tbls; ///< the tableIDs doesn't need to be persistent
}
void reset() {
/* Make sure no thread is using the context anymore! */
nextTxID.store(1);
nextTxID.store(
std::chrono::duration_cast<std::chrono::duration<long long unsigned int, std::nano>>(
std::chrono::high_resolution_clock::now().time_since_epoch()).count());
restarts.store(0);
txCntR.store(0);
txCntW.store(0);
usedSlots.store(0);
oldestVisibleVersion.store(1);
topoGrps[0].second.store(0);
oldestVisibleVersion.store(0);
tToTX.clear();
}
......@@ -261,7 +354,7 @@ class StateContext {
* used for cleaning up version arrays */
std::atomic<TransactionID> oldestVisibleVersion{0};
TableID numStates{0u};
GroupID numGroups{0u};
GroupID * numGroups;
};
} /* end namespace pfabric */
......
......@@ -81,7 +81,7 @@ void generateWorkload(const double theta, const std::string &name) {
workload_file.open(name);
if constexpr (Z) {
ZipfianGenerator zipfGen{0, keyRange-1, theta};
ZipfianGenerator<KeyType> zipfGen{0, keyRange-1, theta};
std::cout << "Using Zipf with theta = " << theta << '\n';
for (auto t = 1u; t < workloadNumTxs+1; ++t) {
for(auto k = 0u; k < txSize; ++k) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment