Commit 21a8f354 authored by Philipp Götze's avatar Philipp Götze
Browse files

Prototypical ad-hoc and writing queries for MVCC Table

parent 1fded784
......@@ -100,7 +100,7 @@ if(USE_ROCKSDB_TABLE)
# RocksDB key-value store
download_project(PROJ rocksdb
GIT_REPOSITORY https://github.com/facebook/rocksdb
GIT_TAG v5.15.10
GIT_TAG v6.2.2
UPDATE_DISCONNECTED 1
QUIET
)
......
......@@ -22,7 +22,7 @@ option(USE_RABBITMQ "use RabbitMQ as network source"
option(USE_KAFKA "use Apache Kafka as network source" OFF)
option(USE_MQTT "use MQTT as network source" OFF)
option(USE_BOOST_SPIRIT_PARSER "use the boost::spirit::qi parsers (strings convertion)" ON )
option(USE_ROCKSDB_TABLE "use RocksDB for implementing persistent tables" OFF)
option(USE_ROCKSDB_TABLE "use RocksDB for implementing persistent tables" ON)
option(USE_NVM_TABLE "use NVM for implementing persistent memory tables" OFF)
option(BUILD_ONLY_LIBS "build only the two pipefabric libraries" ON )
option(BUILD_TEST_CASES "build tests for pipefabric functionality" OFF)
......
/*
* Copyright (C) 2014-2018 DBIS Group - TU Ilmenau, All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* PipeFabric is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with PipeFabric. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef FromMVCCTable_hpp_
#define FromMVCCTable_hpp_
#include <random>
#include "core/Punctuation.hpp"
#include "core/Tuple.hpp"
#include "qop/DataSource.hpp"
#include "pubsub/channels/ConnectChannels.hpp"
#include "qop/BaseOp.hpp"
#include "qop/OperatorMacros.hpp"
#include "table/MVCCTable.hpp"
namespace pfabric {
/**
* @brief A SelectFromTable operator creates a stream from the tuples
* of a relational table.
*
* The SelectFromTable operator produces a stream of tuples
* from the given table which can be optinally selected by a predicate.
*
* @tparam StreamElement
* the data stream element type which shall be retrieve from the table
* @tparam KeyType
* the data type of the key for identifying tuples in the table
*/
template<typename StreamElement, typename KeyType, size_t TxSize>
class FromMVCCTables : public DataSource<StreamElement> {
public:
PFABRIC_SOURCE_TYPEDEFS(StreamElement);
using RecordType = typename StreamElement::element_type;
using TablePtr = std::shared_ptr<MVCCTable<RecordType, KeyType>>;
using SCtxType = StateContext<RecordType, KeyType>;
using Predicate = typename MVCCTable<RecordType, KeyType>::Predicate;
/**
* Create a new SelectFromTable operator that produces a stream of tuples
* from the given table.
* @param tbl the table that is read
* @param pred an optional filter predicate
*/
FromMVCCTables(unsigned int keyRange, SCtxType& sCtx)
: mTables{sCtx.regStates[0], sCtx.regStates[1]}, dis{0, keyRange}, mSCtx{sCtx} {}
/**
* Deallocates all resources.
*/
~FromMVCCTables() {}
unsigned long start() {
auto mTxnID = mSCtx.newTx();
KeyType mKeys[TxSize];
for(auto i = 0u; i < TxSize; i++) {
mKeys[i] = dis(mSCtx.rndGen);
}
assert(mTables[0].get() != nullptr);
assert(mTables[1].get() != nullptr);
SmartPtr<RecordType> tpls[2][TxSize];
restart:;
for (auto i = 0u; i < 2; i++) {
for (auto j = 0u; j < TxSize; j++) {
if (mTables[i]->getByKey(mTxnID, mKeys[j], tpls[i][j]) != 0) {
/* restart, caused by inconsistency */
std::cout << "Key: " << mKeys[j] << std::endl;
mSCtx.restarts++;
//mTxnID = mSCtx.newTx();
//boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
//goto restart;
return 0;
}
}
}
// check if same for correctness criteria
//if (std::get<2>(*tpls[0][0]) != std::get<2>(*tpls[1][0]) || std::get<2>(*tpls[0][1]) != std::get<2>(*tpls[1][1]))
// std::cout << "ERROR: INCONSISTENT READ\n";
// when everything consistent, publish the tuples
for (auto i = 0u; i < 2; i++) {
for (auto j = 0u; j < TxSize; j++) {
this->getOutputDataChannel().publish(tpls[i][j], false);
}
}
this->getOutputPunctuationChannel().publish(PunctuationPtr(new Punctuation(Punctuation::EndOfStream)));
mSCtx.removeTx(mTxnID);
return 4;
}
private:
const TablePtr mTables[2]; //< the table from which the tuples are fetched
std::uniform_int_distribution<KeyType> dis;
SCtxType& mSCtx;
};
}
#endif
/*
* Copyright (C) 2014-2018 DBIS Group - TU Ilmenau, All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* PipeFabric is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with PipeFabric. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef ToMVCCTable_hpp_
#define ToMVCCTable_hpp_
#include "qop/UnaryTransform.hpp"
#include "qop/OperatorMacros.hpp"
#include "table/Table.hpp"
namespace pfabric {
/**
* @brief A ToTable operator stores stream elements in a given relational table.
*
* The ToTable operator is a stream operator which stores tuples arriving
* from a stream in a given relational table. Depending on the existence
* of the key value, the tuple is newly inserted or an existing tuple is
* updated.
*
* @tparam StreamElement
* the data stream element type which shall be stored in the table
* @tparam KeyType
* the data type of the key for identifying tuples in the table
*/
template<
typename StreamElement,
typename KeyType = DefaultKeyType
>
class ToMVCCTable : public UnaryTransform<StreamElement, StreamElement> {
private:
PFABRIC_UNARY_TRANSFORM_TYPEDEFS(StreamElement, StreamElement);
public:
//< Typedef for a pointer to the table.
typedef std::shared_ptr<MVCCTable<typename StreamElement::element_type, KeyType>> TablePtr;
//< the function for deriving the key of an incoming stream element
typedef std::function< KeyType(const StreamElement&) > KeyFunc;
//< the function for deriving the TransactionID of an incoming stream element
typedef std::function< TransactionID(const StreamElement&) > TxIDFunc;
/**
* Create a new ToTable operator to store incoming tuples in the
* given table.
*
* @param tbl pointer to the table object
* @param func function pointer for deriving the key of the tuple
* @param autoCommit auto-commit mode
*/
ToMVCCTable(TablePtr tbl, KeyFunc keyFunc, TxIDFunc txFunc, bool autoCommit = false) :
mTable(tbl), mKeyFunc(keyFunc), mTxFunc(txFunc), mAutoCommit(autoCommit) {}
/**
* @brief Bind the callback for the data channel.
*/
BIND_INPUT_CHANNEL_DEFAULT(InputDataChannel, ToMVCCTable, processDataElement);
/**
* @brief Bind the callback for the punctuation channel.
*/
BIND_INPUT_CHANNEL_DEFAULT(InputPunctuationChannel, ToMVCCTable, processPunctuation);
const std::string opName() const override { return std::string("ToMVCCTable"); }
private:
/**
* @brief This method is invoked when a punctuation arrives.
*
* It simply forwards the @c punctuation to the subscribers.
*
* @param[in] punctuation
* the incoming punctuation tuple
*/
void processPunctuation(const PunctuationPtr& punctuation) {
if (punctuation->ptype() == Punctuation::TxCommit)
mTable->transactionPreCommit(boost::any_cast<TransactionID>(punctuation->data()));
else if (punctuation->ptype() == Punctuation::TxAbort)
mTable->transactionAbort(boost::any_cast<TransactionID>(punctuation->data()));
else if (punctuation->ptype() == Punctuation::TxBegin)
mTable->transactionBegin(boost::any_cast<TransactionID>(punctuation->data()));
this->getOutputPunctuationChannel().publish(punctuation);
}
/**
* @brief This method is invoked when a stream element arrives from the publisher.
*
* It inserts or updates the tuple in the table. If the tuple is outdated
* it will be removed instead.
*
* @param[in] data
* the incoming stream element
* @param[in] outdated
* flag indicating whether the tuple is new or invalidated now
*/
void processDataElement(const StreamElement& data, const bool outdated) {
auto txID = mTxFunc(data);
auto key = mKeyFunc(data);
if (outdated)
mTable->deleteByKey(txID, key);
else {
mTable->insert(txID, key, *data);
}
if (mAutoCommit) {
// perform commit
mTable->transactionCommit(txID);
}
this->getOutputDataChannel().publish(data, outdated);
}
TablePtr mTable; //< function pointer to the table where tuples will be stored
KeyFunc mKeyFunc; //< pointer to the key extractor function
TxIDFunc mTxFunc; //< pointer to the TransactionID extractor func
bool mAutoCommit; //< auto-commit mode
};
} // namespace pfabric
#endif
......@@ -479,6 +479,7 @@ class RDBTable : public BaseTable {
db = nullptr;
boost::filesystem::path dbFile(mTableName + ".db");
boost::filesystem::remove_all(dbFile);
openOrCreateTable(mTableName);
}
/**
......@@ -506,6 +507,7 @@ class RDBTable : public BaseTable {
private:
void openOrCreateTable(const std::string& tableName) throw(TableException) {
// writeOptions.sync = true;
std::string fileName = tableName + ".db";
rocksdb::Options options;
options.create_if_missing = true;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment