Commit 76aa4c0f authored by Kai-Uwe Sattler's avatar Kai-Uwe Sattler
Browse files

transaction support added

parent 7f40cecd
......@@ -34,7 +34,6 @@ add_custom_command(
${json_SOURCE_DIR}/src/json.hpp
${THIRD_PARTY_DIR}/json)
#--------------------------------------------------------------------------------
# the format library
download_project(PROJ Format
GIT_REPOSITORY https://github.com/fmtlib/fmt.git
......@@ -48,12 +47,6 @@ add_custom_command(
COMMAND ${CMAKE_COMMAND} -E copy
${Format_SOURCE_DIR}/fmt/format.h
${THIRD_PARTY_DIR}/fmt
COMMAND ${CMAKE_COMMAND} -E copy
${Format_SOURCE_DIR}/fmt/ostream.h
${THIRD_PARTY_DIR}/fmt
COMMAND ${CMAKE_COMMAND} -E copy
${Format_SOURCE_DIR}/fmt/ostream.cc
${THIRD_PARTY_DIR}/fmt
COMMAND ${CMAKE_COMMAND} -E copy
${Format_SOURCE_DIR}/fmt/format.cc
${THIRD_PARTY_DIR}/fmt)
......@@ -61,7 +54,6 @@ add_custom_command(
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DFMT_HEADER_ONLY=1")
include_directories("${THIRD_PARTY_DIR}/fmt")
#--------------------------------------------------------------------------------
# the SimpleWeb library
download_project(PROJ SimpleWeb
GIT_REPOSITORY https://github.com/eidheim/Simple-Web-Server.git
......@@ -76,8 +68,6 @@ add_custom_command(
${SimpleWeb_SOURCE_DIR}
${THIRD_PARTY_DIR}/SimpleWeb)
#--------------------------------------------------------------------------------
if (BUILD_GOOGLE_BENCH)
# Google Benchmark framework
download_project(PROJ benchmark
GIT_REPOSITORY https://github.com/google/benchmark.git
......@@ -85,55 +75,3 @@ download_project(PROJ benchmark
UPDATE_DISCONNECTED 1
QUIET
)
add_custom_command(
OUTPUT ${THIRD_PARTY_DIR}/benchmark
COMMAND ${CMAKE_COMMAND} -E chdir ${benchmark_SOURCE_DIR} cmake -DCMAKE_BUILD_TYPE=Release
COMMAND ${CMAKE_COMMAND} -E chdir ${benchmark_SOURCE_DIR} $(MAKE)
COMMAND ${CMAKE_COMMAND} -E make_directory ${THIRD_PARTY_DIR}/benchmark/include
COMMAND ${CMAKE_COMMAND} -E make_directory ${THIRD_PARTY_DIR}/benchmark/lib
COMMAND ${CMAKE_COMMAND} -E copy_directory
${benchmark_SOURCE_DIR}/include
${THIRD_PARTY_DIR}/benchmark/include
COMMAND ${CMAKE_COMMAND} -E copy
${benchmark_SOURCE_DIR}/src/libbenchmark.a
${THIRD_PARTY_DIR}/benchmark/lib
)
endif()
#--------------------------------------------------------------------------------
if(USE_ROCKSDB_TABLE)
# RocksDB key-value store
download_project(PROJ rocksdb
GIT_REPOSITORY https://github.com/facebook/rocksdb
GIT_TAG v5.1.4
UPDATE_DISCONNECTED 1
QUIET
)
add_custom_command(
OUTPUT ${THIRD_PARTY_DIR}/rocksdb
COMMAND ${CMAKE_COMMAND} -E chdir ${rocksdb_SOURCE_DIR} $(MAKE) static_lib
COMMAND ${CMAKE_COMMAND} -E make_directory ${THIRD_PARTY_DIR}/rocksdb/include
COMMAND ${CMAKE_COMMAND} -E make_directory ${THIRD_PARTY_DIR}/rocksdb/lib
COMMAND ${CMAKE_COMMAND} -E copy_directory
${rocksdb_SOURCE_DIR}/include
${THIRD_PARTY_DIR}/rocksdb/include
COMMAND ${CMAKE_COMMAND} -E copy
${rocksdb_SOURCE_DIR}/librocksdb.a
${THIRD_PARTY_DIR}/rocksdb/lib
)
endif()
#--------------------------------------------------------------------------------
if(BUILD_USE_CASES)
# data for use cases
download_project(PROJ data
GIT_REPOSITORY https://github.com/dbis-ilm/data.git
GIT_TAG master
UPDATE_DISCONNECTED 1
QUIET
)
file(COPY ${PROJECT_BINARY_DIR}/data-src/DEBS2017
DESTINATION ${THIRD_PARTY_DIR}
)
endif()
......@@ -57,6 +57,8 @@ typedef std::size_t TupleLimit;
/// the length of a sliding window
typedef unsigned int SlideLength;
/// unique identifier of a transaction
typedef unsigned long TransactionID;
/// vector of strings
typedef std::vector< std::string > StringTuple;
......
......@@ -51,6 +51,11 @@ public:
EndOfSubStream = (1u << 1), //< the end of a substream was identified
WindowExpired = (1u << 2), //< the window was expired (used together with tumbling windows)
SlideExpired = (1u << 3), //< ???
TxBegin = (1u << 4), //< begin of transaction (requires the TransactionID in mData field)
TxCommit = (1u << 5), //< commit of the transaction whose TransactionID is
//< given in the mData field
TxAbort = (1u << 6), //< aborting the transaction whose TransactionID is
//< given in the mData field
All = (~0u), //< all of the above, used for masking
};
......@@ -62,7 +67,7 @@ public:
mPtype(pt), mData(val), mTstamp(ts) {
}
Punctuation(PType pt, Timestamp ts = TimestampHelper::timestampFromCurrentTime()) :
Punctuation(PType pt, Timestamp ts = TimestampHelper::timestampFromCurrentTime()) :
mPtype(pt), mTstamp(ts) {
}
......
......@@ -113,6 +113,19 @@ public:
return tbl;
}
template <typename RecordType, typename KeyType = DefaultKeyType>
std::shared_ptr<TxTable<RecordType, KeyType>> createTxTable(const TableInfo& tblInfo) throw (TableException) {
// first we check whether the table exists already
auto it = mTableSet.find(tblInfo.tableName());
if (it != mTableSet.end())
throw TableException("table already exists");
// create a new table and register it
auto tbl = std::make_shared<TxTable<RecordType, KeyType>>(tblInfo);
mTableSet[tblInfo.tableName()] = tbl;
return tbl;
}
/**
* @brief Gets a table by its name.
*
......
This diff is collapsed.
......@@ -19,19 +19,22 @@
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#include <boost/thread.hpp>
#include <boost/chrono.hpp>
#include "Topology.hpp"
#include "qop/ZMQSource.hpp"
using namespace pfabric;
Topology::~Topology() {
// delete all pipes we have created
/*
for (auto i : pipes) {
delete i;
if (!wakeupTimers.empty()) {
for (auto& thr : wakeupTimers) {
thr.interrupt();
}
for (auto& thr : wakeupTimers) {
thr.join();
}
}
*/
}
void Topology::registerStartupFunction(StartupFunc func) {
......@@ -79,11 +82,11 @@ void Topology::wait() {
f.get();
}
void Topology::runEvery(const std::chrono::seconds& secs) {
wakeupTimers.push_back(std::thread([&](){
void Topology::runEvery(unsigned long secs) {
wakeupTimers.push_back(boost::thread([this, secs](){
while(true) {
std::this_thread::sleep_for(secs);
startAsync();
boost::this_thread::sleep_for(boost::chrono::seconds(secs));
this->start(false);
}
}));
}
......@@ -93,8 +96,7 @@ Pipe<TStringPtr> Topology::newStreamFromFile(const std::string& fname, unsigned
auto op = std::make_shared<TextFileSource>(fname, limit);
// register it's start function
registerStartupFunction(std::bind(&TextFileSource::start, op.get()));
// and create a new pipe; we use a raw pointer here because
// we want to return a reference to a Pipe object
// and create a new pipe
return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
}
......@@ -104,8 +106,7 @@ Pipe<TStringPtr> Topology::newStreamFromRabbitMQ(const std::string& info, const
auto op = std::make_shared<RabbitMQSource>(info, queueName);
// register it's start function
registerStartupFunction(std::bind(&RabbitMQSource::start, op.get()));
// and create a new pipe; we use a raw pointer here because
// we want to return a reference to a Pipe object
// and create a new pipe
return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
}
#endif
......@@ -117,8 +118,7 @@ Pipe<TStringPtr> Topology::newStreamFromKafka(const std::string& broker, const s
auto op = std::make_shared<KafkaSource>(broker, topic, groupID);
// register it's start function
registerStartupFunction(std::bind(&KafkaSource::start, op.get()));
// and create a new pipe; we use a raw pointer here because
// we want to return a reference to a Pipe object
// and create a new pipe
return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
}
#endif
......@@ -129,8 +129,7 @@ Pipe<TStringPtr> Topology::newStreamFromMQTT(const std::string& conn, const std:
auto op = std::make_shared<MQTTSource>(conn, channel);
// register it's start function
registerStartupFunction(std::bind(&MQTTSource::start, op.get()));
// and create a new pipe; we use a raw pointer here because
// we want to return a reference to a Pipe object
// and create a new pipe
return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
}
#endif
......@@ -143,8 +142,7 @@ Pipe<TStringPtr> Topology::newStreamFromREST(unsigned int port,
auto op = std::make_shared<RESTSource>(port, path, method, numThreads);
// register it's start function
registerStartupFunction(std::bind(&RESTSource::start, op.get()));
// and create a new pipe; we use a raw pointer here because
// we want to return a reference to a Pipe object
// and create a new pipe
return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
}
......
......@@ -27,6 +27,7 @@
#include <future>
#include <mutex>
#include <chrono>
#include <boost/thread.hpp>
#include "core/Tuple.hpp"
......@@ -40,6 +41,7 @@
#include "qop/ToTable.hpp"
#include "qop/FromTable.hpp"
#include "qop/SelectFromTable.hpp"
#include "qop/SelectFromTxTable.hpp"
#include "qop/StreamGenerator.hpp"
#ifdef SUPPORT_MATRICES
#include "qop/FromMatrix.hpp"
......@@ -90,12 +92,11 @@ namespace pfabric {
/// the signature of a startup function
typedef std::function<unsigned long()> StartupFunc;
// std::list<Pipe*> pipes; //< the list of pipes created for this topology
std::vector<StartupFunc> startupList; //< the list of functions to be called for startup
std::vector<StartupFunc> prepareList; //< the list of functions to be called for startup
bool asyncStarted; //< true if we started asynchronously
std::vector<std::future<unsigned long> > startupFutures; //< futures for the startup functions
std::vector<std::thread> wakeupTimers; //< threads for runEvery queries
std::vector<boost::thread> wakeupTimers; //< interruptible threads for runEvery queries
std::mutex mMutex; //< mutex for accessing startupFutures
DataflowPtr dataflow;
......@@ -157,8 +158,8 @@ namespace pfabric {
*
* @param[in] secs
* the period of time between two invocations
*/
void runEvery(const std::chrono::seconds& secs);
*/
void runEvery(unsigned long secs);
/**
* @brief Waits until the execution of the topology stopped.
......@@ -375,6 +376,13 @@ namespace pfabric {
return Pipe<T>(dataflow, dataflow->addPublisher(op));
}
template<typename T, typename KeyType = DefaultKeyType>
Pipe<T> selectFromTxTable(std::shared_ptr<TxTable<typename T::element_type, KeyType>> tbl,
typename TxTable<typename T::element_type, KeyType>::Predicate pred = nullptr) {
auto op = std::make_shared<SelectFromTxTable<T, KeyType>>(tbl, pred);
registerStartupFunction([=]() -> unsigned long { return op->start(); });
return Pipe<T>(dataflow, dataflow->addPublisher(op));
}
/**
* @brief Create a StreamGenerator operator as data source.
*
......
......@@ -57,7 +57,8 @@ public:
/**
* Typedef for a function pointer to a projection function.
*/
typedef std::function<OutputStreamElement (const InputStreamElement&, bool, StateRepPtr)> MapFunc;
typedef std::function<OutputStreamElement (const InputStreamElement&, bool,
StatefulMap<InputStreamElement, OutputStreamElement, StateRep>&)> MapFunc;
/**
* @brief Construct a new instance of the stateful map operator.
......@@ -81,6 +82,12 @@ public:
const std::string opName() const override { return std::string("StatefulMap"); }
StateRepPtr state() { return mState; }
void publishPunctuation( const PunctuationPtr& punctuation ) {
this->getOutputPunctuationChannel().publish(punctuation);
}
private:
/**
......@@ -106,7 +113,7 @@ private:
* flag indicating whether the tuple is new or invalidated now
*/
void processDataElement( const InputStreamElement& data, const bool outdated ) {
auto res = mFunc( data, outdated, mState );
auto res = mFunc( data, outdated, *this );
this->getOutputDataChannel().publish( res, outdated );
}
......
......@@ -60,5 +60,6 @@ using Table = pfabric::HashMapTable<RecordType, KeyType>;
#endif
#endif
#include "table/TxTable.hpp"
#endif
......@@ -187,7 +187,8 @@ class TxTable : public BaseTable {
*/
unsigned long updateOrDeleteByKey(KeyType key, UpdelFunc ufunc, InsertFunc ifunc = nullptr) {
// TODO: Tx support
return rdbTable.updateOrDeleteByKey(key, ufunc, ifunc);
// return rdbTable.updateOrDeleteByKey(key, ufunc, ifunc);
return 0;
}
/**
......
......@@ -79,12 +79,12 @@ TEST_CASE("Applying a stateful map function to a tuple stream", "[StatefulMap]")
auto mockup = std::make_shared< StreamMockup<InTuplePtr, OutTuplePtr> >(input, expected);
auto map_fun = [&]( const InTuplePtr& tp, bool, TestMap::StateRepPtr state) -> OutTuplePtr {
state->cnt++;
state->sum += tp->getAttribute<2>();
auto map_fun = [&]( const InTuplePtr& tp, bool, TestMap& self) -> OutTuplePtr {
self.state()->cnt++;
self.state()->sum += tp->getAttribute<2>();
return makeTuplePtr(
tp->getAttribute<0>(), tp->getAttribute<2>(),
state->cnt, state->sum
self.state()->cnt, self.state()->sum
);
};
auto mop = std::make_shared< TestMap >(map_fun);
......
......@@ -264,7 +264,7 @@ struct MySumState {
TEST_CASE("Building and running a topology with stateful map", "[StatefulMap]") {
typedef TuplePtr<unsigned long, double> MyTuplePtr;
typedef TuplePtr<double> AggregationResultPtr;
typedef StatefulMap<AggregationResultPtr, AggregationResultPtr, MySumState> TestMap;
typedef StatefulMap<MyTuplePtr, AggregationResultPtr, MySumState> TestMap;
StreamGenerator<MyTuplePtr>::Generator streamGen ([](unsigned long n) -> MyTuplePtr {
return makeTuplePtr(n, (double)n + 0.5);
......@@ -274,9 +274,9 @@ TEST_CASE("Building and running a topology with stateful map", "[StatefulMap]")
std::vector<double> results;
auto mapFun = [&]( const MyTuplePtr& tp, bool, TestMap::StateRepPtr state) -> AggregationResultPtr {
state->sum += get<1>(tp);
return makeTuplePtr(state->sum);
auto mapFun = [&]( const MyTuplePtr& tp, bool, TestMap& self) -> AggregationResultPtr {
self.state()->sum += get<1>(tp);
return makeTuplePtr(self.state()->sum);
};
Topology t;
......
add_subdirectory(DEBS2017)
# add_subdirectory(MatrixProcessing)
add_subdirectory(freqTrajectories)
add_subdirectory(FreqTrajectories)
add_subdirectory(TxSupport)
......@@ -7,7 +7,7 @@
#include <vector> //cluster and Markov chain
#include <chrono> //measuring necessary time for anomaly detection
#include "pfabric.hpp"
#include "pfabric.hpp"
using namespace pfabric;
......@@ -426,9 +426,10 @@ int main(int argc, char **argv) {
//-----Preprocessing Input data-----
//write the tuples to state, return the preprocessed state as new tuple
.statefulMap<Preproc_Output_tp, InputState>([&](auto tp, bool, std::shared_ptr<InputState> state) {
.statefulMap<Preproc_Output_tp, InputState>([&](auto tp, bool,
StatefulMap<Input_tp, Preproc_Output_tp, InputState>& self) {
tuples_processed++; //for statistics
return calculateStates(tp, state);
return calculateStates(tp, self.state());
})
//filters unuseful and redundant tuples
.where([](auto tp, bool){return get<1>(tp) != 0; })
......@@ -440,8 +441,9 @@ int main(int argc, char **argv) {
.partitionBy([&threadAmount](auto tp) { return get<0>(tp) % threadAmount; }, threadAmount)
//-----Clustering step-----
.statefulMap<Cluster_Output_tp, ClusterState>([](auto tp, bool outdated, std::shared_ptr<ClusterState> state){
return calculateClusters(tp, outdated, state);
.statefulMap<Cluster_Output_tp, ClusterState>([](auto tp, bool outdated,
StatefulMap<Preproc_Output_tp, Cluster_Output_tp, ClusterState>& self){
return calculateClusters(tp, outdated, self.state());
})
//-----Markov chain-----
......
......@@ -77,7 +77,7 @@ FreqTrajectoryBatch findFrequentTrajectories(BatchPtr<TuplePtr<Pattern>> batchPt
}
#endif
WaypointPtr findClosestWaypoint(std::shared_ptr<Table<Landmark::element_type, uint_t>> landmarksTable,
WaypointPtr findClosestWaypoint(std::shared_ptr<Table<Landmark::element_type, uint_t>> landmarksTable,
TrackpointPtr tp) {
// waypoint to trackpoint
auto iter = landmarksTable->select();
......@@ -91,7 +91,7 @@ WaypointPtr findClosestWaypoint(std::shared_ptr<Table<Landmark::element_type, ui
closestTrackpoint = *iter;
}
}
return makeTuplePtr(get<0>(tp), get<0>(closestTrackpoint),
return makeTuplePtr(get<0>(tp), get<0>(closestTrackpoint),
get<1>(closestTrackpoint), get<2>(closestTrackpoint), get<3>(tp));
}
......@@ -138,7 +138,7 @@ int main(int argc, char **argv) {
/* ----------------------------------------------------------------- */
PFabricContext ctx;
/* --- Create the necessary tables --- */
createTables(ctx, importFile);
......@@ -146,7 +146,7 @@ int main(int argc, char **argv) {
auto landmarksTable = ctx.getTable<Landmark::element_type, uint_t>("landmarks");
auto tracksTable = ctx.getTable<UserTrack::element_type, uint_t>("user_tracks");
/* --- Topology #1: Receive user positions via REST, store them in the user_tracks table and
/* --- Topology #1: Receive user positions via REST, store them in the user_tracks table and
* update the visits table --- */
auto t1 = ctx.createTopology();
auto s = t1->newStreamFromREST(8099, "^/track$", RESTSource::POST_METHOD)
......@@ -160,13 +160,13 @@ int main(int argc, char **argv) {
// store the waypoint in the user_tracks table
auto s0 = s.keyBy<0, uint_t>()
.updateTable<UserTrack, uint_t>(tracksTable,
.updateTable<UserTrack, uint_t>(tracksTable,
[](WaypointPtr tp, bool outdated, UserTrack::element_type& rec) -> bool {
get<1>(rec).push_back(Trackpoint(get<4>(tp), get<1>(tp)));
return true;
},
[](WaypointPtr tp) -> UserTrack::element_type {
return UserTrack::element_type(get<0>(tp),
return UserTrack::element_type(get<0>(tp),
std::vector<Trackpoint>({ Trackpoint(get<4>(tp), get<1>(tp)) }));
}
);
......@@ -179,7 +179,7 @@ int main(int argc, char **argv) {
})
.keyBy<0, uint_t>() // LandmarkID
.toTable<uint_t>(visitsTable);
// auto s2 = s.print(std::cout);
t1->start();
......@@ -190,8 +190,7 @@ int main(int argc, char **argv) {
auto d = t2->selectFromTable<Visit, uint_t>(visitsTable)
.print(std::cout);
using namespace std::chrono_literals;
t2->runEvery(1min);
t2->runEvery(60);
#endif
#if 1
......@@ -210,7 +209,7 @@ int main(int argc, char **argv) {
}
});
t3->runEvery(2min);
t3->runEvery(120);
#endif
/* --- Start the internal Web server for serving files --- */
......
......@@ -4,7 +4,7 @@
* marked by BEGIN and COMMIT. The stream elements are used to update a
* relational table. A second batch topology (query) reads this table
* periodically. The transactional context guarantees snapshot isolation
* of this query.
* of this query.
*/
#include <chrono>
#include <cmath>
......@@ -99,4 +99,6 @@ int main(int argc, char **argv) {
t2->runEvery(5);
t1->wait();
accountTable->drop();
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment