Commit 49653222 authored by Constantin Pohl's avatar Constantin Pohl
Browse files

minor bugfixes

parent e2040f8f
......@@ -45,10 +45,10 @@ add_custom_command(
OUTPUT ${THIRD_PARTY_DIR}/fmt
COMMAND ${CMAKE_COMMAND} -E make_directory ${THIRD_PARTY_DIR}/fmt
COMMAND ${CMAKE_COMMAND} -E copy
${Format_SOURCE_DIR}/fmt/format.h
${Format_SOURCE_DIR}/fmt/format.*
${THIRD_PARTY_DIR}/fmt
COMMAND ${CMAKE_COMMAND} -E copy
${Format_SOURCE_DIR}/fmt/format.cc
${Format_SOURCE_DIR}/fmt/ostream.*
${THIRD_PARTY_DIR}/fmt)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DFMT_HEADER_ONLY=1")
......
......@@ -112,7 +112,7 @@ endif()
#CMAKE_FORCE_CXX_COMPILER(icpc "Intel C++ Compiler")
# C++ compiler flags
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14 -Wall -Wno-deprecated -g -O1 -Wsign-compare")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14 -Wall -Wno-deprecated -g -O3 -Wsign-compare")
if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-local-typedefs -Wno-#pragma-messages")
elseif("${CMAKE_CXX_COMPILER_ID}" MATCHES "GNU")
......
......@@ -105,6 +105,8 @@ public:
//< setting the bool component of @c UpdateResult to false)
typedef std::function<bool(RecordType&)> UpdelFunc;
typedef std::function<RecordType()> InsertFunc;
//< typedef for a callback function which is invoked when the table was updated
typedef boost::signals2::signal<void (const RecordType&, TableParams::ModificationMode)> ObserverCallback;
......@@ -221,7 +223,7 @@ public:
* or deleted (=false)
* @return the number of modified tuples
*/
unsigned long updateOrDeleteByKey(KeyType key, UpdelFunc ufunc) {
unsigned long updateOrDeleteByKey(KeyType key, UpdelFunc ufunc, InsertFunc ifunc = nullptr) {
// make sure we have exclusive access
// note that we don't use a guard here!
std::unique_lock<std::mutex> lock(mMtx);
......@@ -246,8 +248,13 @@ public:
return num;
}
else {
// don't forget to release the lock
lock.unlock();
// key doesn't exist
if (ifunc != nullptr) {
insert(key, ifunc());
// don't forget to release the lock
lock.unlock();
return 1;
}
}
return 0;
}
......
......@@ -35,12 +35,16 @@
#include "fmt/format.h"
#ifdef USE_ROCKSDB_TABLE
#include "rocksdb/db.h"
#include "table/RDBTable.hpp"
#else
#include "table/HashMapTable.hpp"
#endif
#include "core/serialize.hpp"
#include "table/BaseTable.hpp"
#include "table/RDBTable.hpp"
#include "table/TableException.hpp"
#include "table/TableInfo.hpp"
#include "table/LogBuffer.hpp"
......@@ -63,37 +67,44 @@ namespace pfabric {
template <typename RecordType, typename KeyType = DefaultKeyType>
class TxTable : public BaseTable {
public:
#ifdef USE_ROCKSDB_TABLE
typedef RDBTable<RecordType, KeyType> Table;
#else
typedef HashMapTable<RecordType, KeyType> Table;
#endif
//< typedef for a predicate evaluated using a scan
// typedef std::function<bool(const RecordType&)> Predicate;
//< typedef for a updater function which returns a modification of the
// parameter tuple
typedef typename RDBTable<RecordType, KeyType>::UpdaterFunc UpdaterFunc;
typedef typename Table::UpdaterFunc UpdaterFunc;
//< typedefs for a function performing updates + deletes. Similar to
// UpdaterFunc
//< it allows to update the tuple, but also to delete it (indictated by the
//< setting the bool component of @c UpdateResult to false)
typedef typename RDBTable<RecordType, KeyType>::UpdelFunc UpdelFunc;
typedef typename Table::UpdelFunc UpdelFunc;
typedef typename RDBTable<RecordType, KeyType>::InsertFunc InsertFunc;
typedef typename Table::InsertFunc InsertFunc;
//< typedef for an iterator to scan the table
typedef typename RDBTable<RecordType, KeyType>::TableIterator TableIterator;
typedef typename Table::TableIterator TableIterator;
//< typedef for a predicate evaluated using a scan: see @TableIterator for
// details
typedef typename RDBTable<RecordType, KeyType>::Predicate Predicate;
typedef typename Table::Predicate Predicate;
TxTable(const TableInfo& tInfo) throw(TableException)
: BaseTable(tInfo), rdbTable(tInfo) {
: BaseTable(tInfo), tbl(tInfo) {
}
/**
* Constructor for creating an empty table.
*/
TxTable(const std::string& tableName) throw(TableException)
: rdbTable(tableName) {
: tbl(tableName) {
}
/**
......@@ -108,13 +119,13 @@ class TxTable : public BaseTable {
for (auto iter = logBuffer.begin(txID); iter != logBuffer.end(txID); iter++) {
switch (iter->logOp) {
case LogOp::Insert:
rdbTable.insert(iter->key, *(iter->recordPtr));
tbl.insert(iter->key, *(iter->recordPtr));
break;
case LogOp::Update:
// TODO
break;
case LogOp::Delete:
rdbTable.deleteByKey(iter->key);
tbl.deleteByKey(iter->key);
break;
}
}
......@@ -168,7 +179,7 @@ class TxTable : public BaseTable {
*/
unsigned long deleteWhere(Predicate func) {
// TODO: Tx support
return rdbTable.deleteWhere(func);
return tbl.deleteWhere(func);
}
/**
......@@ -187,7 +198,7 @@ class TxTable : public BaseTable {
*/
unsigned long updateOrDeleteByKey(KeyType key, UpdelFunc ufunc, InsertFunc ifunc = nullptr) {
// TODO: Tx support
// return rdbTable.updateOrDeleteByKey(key, ufunc, ifunc);
// return tbl.updateOrDeleteByKey(key, ufunc, ifunc);
return 0;
}
......@@ -205,7 +216,7 @@ class TxTable : public BaseTable {
*/
unsigned long updateByKey(KeyType key, UpdaterFunc ufunc) {
// TODO: Tx support
return rdbTable.updateByKey(key, ufunc);
return tbl.updateByKey(key, ufunc);
}
/**
......@@ -222,7 +233,7 @@ class TxTable : public BaseTable {
*/
unsigned long updateWhere(Predicate pfunc, UpdaterFunc ufunc) {
// TODO: Tx support
return rdbTable.updateWhere(pfunc, ufunc);
return tbl.updateWhere(pfunc, ufunc);
}
/**
......@@ -234,7 +245,7 @@ class TxTable : public BaseTable {
* @param key the key value
* @return the tuple associated with the given key
*/
SmartPtr<RecordType> getByKey(KeyType key) throw(TableException) { return rdbTable.getByKey(key); }
SmartPtr<RecordType> getByKey(KeyType key) throw(TableException) { return tbl.getByKey(key); }
/**
* @brief Return a pair of iterators for scanning the table with a
......@@ -254,7 +265,7 @@ class TxTable : public BaseTable {
* @param func a function pointer to a predicate
* @return a pair of iterators
*/
TableIterator select(Predicate func) { return rdbTable.select(func); }
TableIterator select(Predicate func) { return tbl.select(func); }
/**
* @brief Return a pair of iterators for scanning the whole table.
......@@ -269,20 +280,20 @@ class TxTable : public BaseTable {
*
* @return a pair of iterators
*/
TableIterator select() { return rdbTable.select(); }
TableIterator select() { return tbl.select(); }
/**
* @brief Return the number of tuples stored in the table.
*
* @return the number of tuples
*/
unsigned long size() const { return rdbTable.size(); }
unsigned long size() const { return tbl.size(); }
void drop() { rdbTable.drop(); }
void drop() { tbl.drop(); }
private:
std::mutex tblMtx;
RDBTable<RecordType, KeyType> rdbTable;
Table tbl;
LogBuffer<KeyType, RecordType> logBuffer;
};
}
......
......@@ -34,7 +34,6 @@ if (BUILD_TEST_CASES)
do_test(GroupedAggregationTest)
do_test(ZMQSourceTest)
do_test(SeqCEPTest)
do_test(HashMapTableTest)
do_test(ToTableTest)
do_test(FromTableTest)
do_test(ContextTest)
......
......@@ -410,3 +410,50 @@ TEST_CASE("Using a window with and without additional function", "[Window]") {
t3.start(false);
REQUIRE(strm3.str() == expected);
}
TEST_CASE("Building and running a topology with Transactions", "[Transactions]") {
//transaction id, user id, test string, test double
typedef TuplePtr<int, int, std::string, double> T1;
PFabricContext ctx;
//create table
TableInfo tblInfo("TestTable", {}, ColumnInfo::UInt_Type);
auto testTable = ctx.createTxTable<T1::element_type, int>(tblInfo);
//tuple production
StreamGenerator<T1>::Generator streamGen ([](unsigned long n) -> T1 {
return makeTuplePtr((int)n%3, (int)n, (std::string)"test string", (double)n + 0.5);
});
unsigned long num = 10;
//autocommit, else we need to update a state (tracking transactions)
bool autocommit = true;
auto t1 = ctx.createTopology();
//write to table
auto s1 = t1->streamFromGenerator<T1>(streamGen, num)
.assignTransactionID([](auto tp) { return get<0>(tp); })
.keyBy<1, int>()
.toTxTable<int>(testTable, autocommit);
t1->start();
t1->wait();
REQUIRE(testTable->size() == 10);
int tpCnt = 0;
auto t2 = ctx.createTopology();
//read from table
auto s2 = t2->selectFromTxTable<T1, int>(testTable)
.notify([&](auto tp, bool outdated) {
tpCnt++;
});
t2->start();
t2->wait();
REQUIRE(tpCnt == 10);
testTable->drop();
}
add_subdirectory(DEBS2017)
if(BUILD_USE_CASES)
add_subdirectory(DEBS2017)
# add_subdirectory(MatrixProcessing)
add_subdirectory(FreqTrajectories)
add_subdirectory(TxSupport)
add_subdirectory(FreqTrajectories)
add_subdirectory(TxSupport)
endif()
if (NOT BUILD_ONLY_LIBS)
add_executable(TrackerServer TrackerServer.cpp
TrajectoryDB.cpp
GeoUtils.cpp
QueryLoop.cpp
Pattern.cpp
PrefixSpan.cpp
WebServer.cpp)
add_executable(TrackerServer TrackerServer.cpp
TrajectoryDB.cpp
GeoUtils.cpp
QueryLoop.cpp
Pattern.cpp
PrefixSpan.cpp
WebServer.cpp
)
target_link_libraries(TrackerServer
pfabric_qcomp
pfabric_core
${ROCKSDB_LIB}
${Boost_PROGRAM_OPTIONS_LIBRARY}
${Boost_DATE_TIME_LIBRARY}
${Boost_FILESYSTEM_LIBRARY}
${Boost_SYSTEM_LIBRARY}
${Boost_LOG_LIBRARY}
${Boost_FILESYSTEM_LIBRARY}
${Boost_THREAD_LIBRARY}
)
endif()
target_link_libraries(TrackerServer
pfabric_qcomp
pfabric_core
${ROCKSDB_LIB}
${Boost_PROGRAM_OPTIONS_LIBRARY}
${Boost_DATE_TIME_LIBRARY}
${Boost_FILESYSTEM_LIBRARY}
${Boost_SYSTEM_LIBRARY}
${Boost_LOG_LIBRARY}
${Boost_FILESYSTEM_LIBRARY}
${Boost_THREAD_LIBRARY}
)
if (NOT BUILD_ONLY_LIBS)
add_executable(txproc TxProcessing.cpp)
target_link_libraries(txproc
pfabric_core
......@@ -11,4 +10,3 @@ target_link_libraries(txproc
${Boost_FILESYSTEM_LIBRARY}
${Boost_THREAD_LIBRARY}
)
endif()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment