Commit 6beba6ae authored by Philipp Götze's avatar Philipp Götze
Browse files

Adapted Tables and Tx usecase to work with NVM-based DSs

parent ebc58ac9
Pipeline #159 canceled with stages
in 2 minutes and 25 seconds
/*
* Copyright (C) 2014-2019 DBIS Group - TU Ilmenau, All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* PipeFabric is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with PipeFabric. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef FromMVCCTable_hpp_
#define FromMVCCTable_hpp_
#include <random>
#include "core/Punctuation.hpp"
#include "core/Tuple.hpp"
#include "qop/DataSource.hpp"
#include "pubsub/channels/ConnectChannels.hpp"
#include "qop/BaseOp.hpp"
#include "qop/OperatorMacros.hpp"
#include "table/MVCCTable.hpp"
namespace pfabric {
/**
* @brief A SelectFromTable operator creates a stream from the tuples
* of a relational table.
*
* The SelectFromTable operator produces a stream of tuples
* from the given table which can be optinally selected by a predicate.
*
* @tparam StreamElement
* the data stream element type which shall be retrieve from the table
* @tparam KeyType
* the data type of the key for identifying tuples in the table
*/
template<typename StreamElement, typename KeyType, size_t TxSize>
class FromMVCCTables : public DataSource<StreamElement> {
public:
PFABRIC_SOURCE_TYPEDEFS(StreamElement);
using RecordType = typename StreamElement::element_type;
using TablePtr = std::shared_ptr<MVCCTable<RecordType, KeyType>>;
using SCtxType = StateContext<RecordType, KeyType>;
using Predicate = typename MVCCTable<RecordType, KeyType>::Predicate;
/**
* Create a new SelectFromTable operator that produces a stream of tuples
* from the given table.
* @param tbl the table that is read
* @param pred an optional filter predicate
*/
FromMVCCTables(unsigned int keyRange, SCtxType& sCtx)
: mTables{sCtx.regStates[0], sCtx.regStates[1]}, dis{0, keyRange}, mSCtx{sCtx} {}
/**
* Deallocates all resources.
*/
~FromMVCCTables() {}
unsigned long start() {
auto mTxnID = mSCtx.newTx();
KeyType mKeys[TxSize];
for(auto i = 0u; i < TxSize; i++) {
mKeys[i] = dis(mSCtx.rndGen);
}
assert(mTables[0].get() != nullptr);
assert(mTables[1].get() != nullptr);
SmartPtr<RecordType> tpls[2][TxSize];
restart:;
for (auto i = 0u; i < 2; i++) {
for (auto j = 0u; j < TxSize; j++) {
if (mTables[i]->getByKey(mTxnID, mKeys[j], tpls[i][j]) != 0) {
/* restart, caused by inconsistency */
std::cout << "Key: " << mKeys[j] << std::endl;
mSCtx.restarts++;
//mTxnID = mSCtx.newTx();
//boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
//goto restart;
return 0;
}
}
}
// check if same for correctness criteria
//if (std::get<2>(*tpls[0][0]) != std::get<2>(*tpls[1][0]) || std::get<2>(*tpls[0][1]) != std::get<2>(*tpls[1][1]))
// std::cout << "ERROR: INCONSISTENT READ\n";
// when everything consistent, publish the tuples
for (auto i = 0u; i < 2; i++) {
for (auto j = 0u; j < TxSize; j++) {
this->getOutputDataChannel().publish(tpls[i][j], false);
}
}
this->getOutputPunctuationChannel().publish(PunctuationPtr(new Punctuation(Punctuation::EndOfStream)));
mSCtx.removeTx(mTxnID);
return 4;
}
private:
const TablePtr mTables[2]; //< the table from which the tuples are fetched
std::uniform_int_distribution<KeyType> dis;
SCtxType& mSCtx;
};
}
#endif
...@@ -98,6 +98,7 @@ namespace pfabric { ...@@ -98,6 +98,7 @@ namespace pfabric {
mSCtx.restarts++; mSCtx.restarts++;
mTables[0]->cleanUpReads(mKeys, i?j+1:j); mTables[0]->cleanUpReads(mKeys, i?j+1:j);
mTables[1]->cleanUpReads(mKeys, j); mTables[1]->cleanUpReads(mKeys, j);
mSCtx.setReadCTS(mTxnID, 0, 0);
boost::this_thread::sleep_for(boost::chrono::nanoseconds(500*TxSize*waitTime)); boost::this_thread::sleep_for(boost::chrono::nanoseconds(500*TxSize*waitTime));
// waitTime *= 2; // waitTime *= 2;
// boost::this_thread::interruption_point(); // boost::this_thread::interruption_point();
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
* along with PipeFabric. If not, see <http://www.gnu.org/licenses/>. * along with PipeFabric. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef NVMTable_hpp_ #ifndef BDCCPTable_hpp_
#define NVMTable_hpp_ #define BDCCPTable_hpp_
#include <iostream> #include <iostream>
#include <vector> #include <vector>
...@@ -34,7 +34,6 @@ ...@@ -34,7 +34,6 @@
#include <boost/signals2.hpp> #include <boost/signals2.hpp>
#include <libpmemobj++/make_persistent.hpp> #include <libpmemobj++/make_persistent.hpp>
#include <libpmemobj++/p.hpp>
#include <libpmemobj++/persistent_ptr.hpp> #include <libpmemobj++/persistent_ptr.hpp>
#include <libpmemobj++/pool.hpp> #include <libpmemobj++/pool.hpp>
#include <libpmemobj++/transaction.hpp> #include <libpmemobj++/transaction.hpp>
...@@ -52,76 +51,10 @@ ...@@ -52,76 +51,10 @@
namespace pfabric { namespace pfabric {
//TODO: Maybe the pmem device path prefix should be a CMake variable? //TODO: Maybe the pmem device path prefix should be a CMake variable?
const std::string pathPrefix = "/mnt/pmem/test/"; //constexpr auto pathPrefix = "/mnt/pmem/test/";
namespace detail {
struct GetType {
template<typename T>
static auto apply(T &t) {
return ColumnInfo::Void_Type;
}
};
template<>
inline auto GetType::apply<int>(int &t) {
return ColumnInfo::Int_Type;
}
template<>
inline auto GetType::apply<double>(double &t) {
return ColumnInfo::Double_Type;
}
template<>
inline auto GetType::apply<std::string>(std::string &t) {
return ColumnInfo::String_Type;
}
template<class Tuple, std::size_t CurrentIndex>
struct TupleTypes;
template<class Tuple, std::size_t CurrentIndex>
struct TupleTypes {
static void apply(Tuple tp, std::vector<ColumnInfo> &cols) {
TupleTypes<Tuple, CurrentIndex - 1>::apply(tp, cols);
auto type = GetType::apply(std::get<CurrentIndex - 1>(tp));
cols.push_back(ColumnInfo("", type));
}
};
template<class Tuple>
struct TupleTypes<Tuple, 1> {
static void apply(Tuple tp, std::vector<ColumnInfo> &cols) {
auto type = GetType::apply(std::get<0>(tp));
cols.push_back(ColumnInfo("", type));
}
};
template<class Tuple>
TableInfo constructSchema(const std::string &tableName) {
typedef typename Tuple::Base Base;
Base t; // create default initialized std::tuple
std::vector<ColumnInfo> cols;
detail::TupleTypes<Base, std::tuple_size<Base>::value>::apply(t, cols);
TableInfo tInfo(tableName);
tInfo.setColumns(cols);
return tInfo;
}
template<typename T>
struct is_tuple_impl : std::false_type {};
template<typename... Ts>
struct is_tuple_impl<pfabric::Tuple<Ts...>> : std::true_type {};
template<typename T>
struct is_tuple : is_tuple_impl<std::decay_t<T>> {};
} /* namespace detail */
using pmem::obj::delete_persistent; using pmem::obj::delete_persistent;
using pmem::obj::make_persistent; using pmem::obj::make_persistent;
using pmem::obj::p;
using pmem::obj::persistent_ptr; using pmem::obj::persistent_ptr;
using pmem::obj::pool; using pmem::obj::pool;
using pmem::obj::transaction; using pmem::obj::transaction;
...@@ -129,32 +62,32 @@ using dbis::ptable::PTable; ...@@ -129,32 +62,32 @@ using dbis::ptable::PTable;
using dbis::ptable::PTuple; using dbis::ptable::PTuple;
template<typename KeyType, typename RecordType> template<typename KeyType, typename RecordType>
class NVMIterator { class BDCCPIterator {
public: public:
static_assert(detail::is_tuple<RecordType>::value, "Value type must be a pfabric::Tuple"); static_assert(is_tuple<RecordType>::value, "Value type must be a pfabric::Tuple");
using TupleType = typename RecordType::Base; using TupleType = typename RecordType::Base;
// using Predicate = std::function<bool(const PTuple<TupleType, KeyType> &)>; // using Predicate = std::function<bool(const PTuple<TupleType, KeyType> &)>;
using Predicate = std::function<bool(const RecordType &)>; using Predicate = std::function<bool(const RecordType &)>;
using PTableType = PTable<KeyType, TupleType>; using PTableType = PTable<KeyType, TupleType>;
explicit NVMIterator() { explicit BDCCPIterator() {
} }
explicit NVMIterator(typename PTableType::iterator &&_iter, typename PTableType::iterator &&_end, Predicate _pred) : explicit BDCCPIterator(typename PTableType::iterator &&_iter, typename PTableType::iterator &&_end, Predicate _pred) :
iter(std::move(_iter)), end(std::move(_end)), pred(_pred) { iter(std::move(_iter)), end(std::move(_end)), pred(_pred) {
while (isValid() && !pred(*(*iter).createTuple())) while (isValid() && !pred(*(*iter).createTuple()))
iter++; iter++;
} }
NVMIterator &operator++() { BDCCPIterator &operator++() {
iter++; iter++;
while (isValid() && !pred(*(*iter).createTuple())) while (isValid() && !pred(*(*iter).createTuple()))
iter++; iter++;
return *this; return *this;
} }
NVMIterator operator++(int) { BDCCPIterator operator++(int) {
auto tmp = *this; auto tmp = *this;
++(*this); ++(*this);
return tmp; return tmp;
...@@ -178,15 +111,15 @@ class NVMIterator { ...@@ -178,15 +111,15 @@ class NVMIterator {
}; };
template<typename KeyType, typename RecordType> template<typename KeyType, typename RecordType>
inline NVMIterator<KeyType, RecordType> makeNVMIterator( inline BDCCPIterator<KeyType, RecordType> makeBDCCPIterator(
typename PTable<KeyType, typename RecordType::Base>::iterator &&iter, typename PTable<KeyType, typename RecordType::Base>::iterator &&iter,
typename PTable<KeyType, typename RecordType::Base>::iterator &&end, typename PTable<KeyType, typename RecordType::Base>::iterator &&end,
typename NVMIterator<KeyType, RecordType>::Predicate pred) { typename BDCCPIterator<KeyType, RecordType>::Predicate pred) {
return NVMIterator<KeyType, RecordType>(std::move(iter), std::move(end), pred); return BDCCPIterator<KeyType, RecordType>(std::move(iter), std::move(end), pred);
} }
/**************************************************************************//** /**************************************************************************//**
* \brief NVMTable is a class for storing a relation of tuples of the same type. * \brief BDCCPTable is a class for storing a relation of tuples of the same type.
* *
* Table implements a relational table for storing tuples of a given type * Table implements a relational table for storing tuples of a given type
* \c RecordType which are indexed by the key of type \c KeyType. * \c RecordType which are indexed by the key of type \c KeyType.
...@@ -199,9 +132,9 @@ inline NVMIterator<KeyType, RecordType> makeNVMIterator( ...@@ -199,9 +132,9 @@ inline NVMIterator<KeyType, RecordType> makeNVMIterator(
* the data type of the key column (default = int) * the data type of the key column (default = int)
*****************************************************************************/ *****************************************************************************/
template<typename RecordType, typename KeyType = DefaultKeyType> template<typename RecordType, typename KeyType = DefaultKeyType>
class NVMTable : public BaseTable { class BDCCPTable : public BaseTable {
public: public:
static_assert(detail::is_tuple<RecordType>::value, "Value type must be a pfabric::Tuple"); static_assert(is_tuple<RecordType>::value, "Value type must be a pfabric::Tuple");
using TupleType = typename RecordType::Base; using TupleType = typename RecordType::Base;
using PTableType = PTable<KeyType, TupleType>; using PTableType = PTable<KeyType, TupleType>;
...@@ -218,11 +151,13 @@ class NVMTable : public BaseTable { ...@@ -218,11 +151,13 @@ class NVMTable : public BaseTable {
**/ **/
using UpdelFunc = std::function<bool(RecordType &)>; using UpdelFunc = std::function<bool(RecordType &)>;
using InsertFunc = std::function<RecordType()>;
/** typedef for a callback function which is invoked when the table was updated */ /** typedef for a callback function which is invoked when the table was updated */
using ObserverCallback = boost::signals2::signal<void(const RecordType &, TableParams::ModificationMode)>; using ObserverCallback = boost::signals2::signal<void(const RecordType &, TableParams::ModificationMode)>;
/** typedef for an iterator to scan the table */ /** typedef for an iterator to scan the table */
using TableIterator = NVMIterator<KeyType, RecordType>; using TableIterator = BDCCPIterator<KeyType, RecordType>;
/** typedef for a predicate evaluated using a scan: see \TableIterator for details */ /** typedef for a predicate evaluated using a scan: see \TableIterator for details */
using Predicate = typename TableIterator::Predicate; using Predicate = typename TableIterator::Predicate;
...@@ -230,14 +165,14 @@ class NVMTable : public BaseTable { ...@@ -230,14 +165,14 @@ class NVMTable : public BaseTable {
/************************************************************************//** /************************************************************************//**
* \brief Constructor for creating an empty table with only a given name. * \brief Constructor for creating an empty table with only a given name.
*****************************************************************************/ *****************************************************************************/
NVMTable(const std::string &tableName) : BaseTable(detail::constructSchema<RecordType>(tableName)) { BDCCPTable(const std::string &tableName) : BaseTable(constructSchema<RecordType>(tableName)) {
openOrCreateTable(detail::constructSchema<RecordType>(tableName)); openOrCreateTable(constructSchema<RecordType>(tableName));
} }
/************************************************************************//** /************************************************************************//**
* \brief Constructor for creating an empty table with a given schema. * \brief Constructor for creating an empty table with a given schema.
*****************************************************************************/ *****************************************************************************/
NVMTable(const TableInfo &tInfo) : BDCCPTable(const TableInfo &tInfo) :
BaseTable(tInfo) { BaseTable(tInfo) {
openOrCreateTable(tInfo); openOrCreateTable(tInfo);
} }
...@@ -245,7 +180,7 @@ class NVMTable : public BaseTable { ...@@ -245,7 +180,7 @@ class NVMTable : public BaseTable {
/************************************************************************//** /************************************************************************//**
* \brief Destructor for table. * \brief Destructor for table.
*****************************************************************************/ *****************************************************************************/
~NVMTable() { ~BDCCPTable() {
// pop.close(); // pop.close();
} }
...@@ -379,7 +314,7 @@ class NVMTable : public BaseTable { ...@@ -379,7 +314,7 @@ class NVMTable : public BaseTable {
* \return a pair of iterators * \return a pair of iterators
*****************************************************************************/ *****************************************************************************/
TableIterator select(Predicate func) { TableIterator select(Predicate func) {
return makeNVMIterator<KeyType, RecordType>(std::move(pTable->begin()), std::move(pTable->end()), func); return makeBDCCPIterator<KeyType, RecordType>(std::move(pTable->begin()), std::move(pTable->end()), func);
} }
/************************************************************************//** /************************************************************************//**
...@@ -397,7 +332,7 @@ class NVMTable : public BaseTable { ...@@ -397,7 +332,7 @@ class NVMTable : public BaseTable {
*****************************************************************************/ *****************************************************************************/
TableIterator select() { TableIterator select() {
auto alwaysTrue = [](const RecordType &) { return true; }; auto alwaysTrue = [](const RecordType &) { return true; };
return makeNVMIterator<KeyType, RecordType>(std::move(pTable->begin()), std::move(pTable->end()), alwaysTrue); return makeBDCCPIterator<KeyType, RecordType>(std::move(pTable->begin()), std::move(pTable->end()), alwaysTrue);
} }
/************************************************************************//** /************************************************************************//**
...@@ -436,8 +371,17 @@ class NVMTable : public BaseTable { ...@@ -436,8 +371,17 @@ class NVMTable : public BaseTable {
q = nullptr; q = nullptr;
}); });
pop.close(); pop.close();
pmempool_rm((pathPrefix + BaseTable::mTableInfo->tableName() + ".db").c_str(), 1); //pmempool_rm((pathPrefix + BaseTable::mTableInfo->tableName() + ".db").c_str(), 1);
//std::remove((BaseTable::mTableInfo->tableName()+".db").c_str()); std::remove((BaseTable::mTableInfo->tableName()+".db").c_str());
}
void truncate() {
auto pop = pool_by_pptr(q);
transaction::run(pop, [&] {
delete_persistent<PTableType>(q->pTable);
q->pTable = make_persistent<PTableType>();
pTable = q->pTable;
});
} }
void print() { void print() {
...@@ -490,8 +434,8 @@ class NVMTable : public BaseTable { ...@@ -490,8 +434,8 @@ class NVMTable : public BaseTable {
persistent_ptr<PTableType> pTable; persistent_ptr<PTableType> pTable;
ObserverCallback mImmediateObservers, mDeferredObservers; ObserverCallback mImmediateObservers, mDeferredObservers;
}; /* class NVMTable */ }; /* class BDCCPTable */
} /* namespace pfabric */ } /* namespace pfabric */
#endif /* NVMTable_hpp_ */ #endif /* BDCCPTable_hpp_ */
...@@ -475,6 +475,7 @@ class BOCCTable : public BaseTable, ...@@ -475,6 +475,7 @@ class BOCCTable : public BaseTable,
unsigned long size() const { return tbl.size(); } unsigned long size() const { return tbl.size(); }
void drop() { committedWSs.clear(); tbl.drop(); } void drop() { committedWSs.clear(); tbl.drop(); }
void truncate() { committedWSs.clear(); tbl.truncate(); }
private: private:
......
...@@ -34,8 +34,6 @@ ...@@ -34,8 +34,6 @@
#include "table/TableInfo.hpp" #include "table/TableInfo.hpp"
#include "table/TableException.hpp" #include "table/TableException.hpp"
#include "fmt/format.h"
namespace pfabric { namespace pfabric {
/** /**
......
...@@ -377,6 +377,11 @@ public: ...@@ -377,6 +377,11 @@ public:
void drop() { void drop() {
mDataTable.clear(); mDataTable.clear();
//mDataTable = nullptr;
}
void truncate() {
mDataTable.clear();
} }
private: private:
......
...@@ -410,8 +410,12 @@ public: ...@@ -410,8 +410,12 @@ public:
void drop() { void drop() {
mDataTable.clear(); mDataTable.clear();
//mDataTable = nullptr;
} }
void truncate() {
mDataTable.clear();
}
private: private:
/** /**
* @brief Perform the actual notification * @brief Perform the actual notification
......
...@@ -37,6 +37,8 @@ ...@@ -37,6 +37,8 @@
#ifdef USE_ROCKSDB_TABLE #ifdef USE_ROCKSDB_TABLE
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "table/RDBTable.hpp" #include "table/RDBTable.hpp"
#elif USE_NVM_TABLES
#include "table/PBPTreeTable.hpp"
#else #else
#include "table/CuckooTable.hpp" #include "table/CuckooTable.hpp"
#include "table/HashMapTable.hpp" #include "table/HashMapTable.hpp"
...@@ -195,6 +197,8 @@ class MVCCTable : public BaseTable, ...@@ -195,6 +197,8 @@ class MVCCTable : public BaseTable,
public: public:
#ifdef USE_ROCKSDB_TABLE #ifdef USE_ROCKSDB_TABLE
using Table = RDBTable<pfabric::Tuple<MVCCObject<TupleType>>, KeyType>; using Table = RDBTable<pfabric::Tuple<MVCCObject<TupleType>>, KeyType>;
#elif USE_NVM_TABLES
using Table = PBPTreeTable<pfabric::Tuple<MVCCObject<TupleType>>, KeyType>;
#else #else
using Table = CuckooTable<pfabric::Tuple<MVCCObject<TupleType>>, KeyType>; using Table = CuckooTable<pfabric::Tuple<MVCCObject<TupleType>>, KeyType>;
#endif #endif
...@@ -286,7 +290,7 @@ class MVCCTable : public BaseTable, ...@@ -286,7 +290,7 @@ class MVCCTable : public BaseTable,
for(const auto &e : writeSet.set) { for(const auto &e : writeSet.set) {
/* if entry exists */ /* if entry exists */
try { try {
newEntries[i] = KeyMVCCPair{e.first, get<0>(*tbl.getByKey(e.first))}; newEntries[i] = KeyMVCCPair{e.first, ns_types::get<0>(*tbl.getByKey(e.first))};
auto &last = newEntries[i].mvcc; auto &last = newEntries[i].mvcc;
auto iPos = getFreePos(last.usedSlots); auto iPos = getFreePos(last.usedSlots);
while (iPos > last.Versions - 1) { while (iPos > last.Versions - 1) {
...@@ -477,7 +481,7 @@ class MVCCTable : public BaseTable, ...@@ -477,7 +481,7 @@ class MVCCTable : public BaseTable,
// locks.unlockShared(key); // locks.unlockShared(key);
return Errc::NOT_FOUND; return Errc::NOT_FOUND;
} }
const auto& mvcc = get<0>(*tplPtr); const auto& mvcc = ns_types::get<0>(*tplPtr);
// locks.unlockShared(key); // locks.unlockShared(key);
/* Get read CTS (version that was read first) for consistency */ /* Get read CTS (version that was read first) for consistency */
...@@ -542,6 +546,7 @@ class MVCCTable : public BaseTable, ...@@ -542,6 +546,7 @@ class MVCCTable : public BaseTable,
unsigned long size() const { return tbl.size(); } unsigned long size() const { return tbl.size(); }
void drop() { tbl.drop(); } void drop() { tbl.drop(); }
void truncate() { tbl.truncate(); }
private: private:
...@@ -557,6 +562,7 @@ class MVCCTable : public BaseTable, ...@@ -557,6 +562,7 @@ class MVCCTable : public BaseTable,
}; /* end class MVCCTable */