Commit 855d9138 authored by Kai-Uwe Sattler's avatar Kai-Uwe Sattler
Browse files

Fixes to compile after merge

parents 76aa4c0f 20ace0b7
......@@ -425,6 +425,12 @@ FakesAssemblies/
# VSCode
.vscode/
###################################
#
# JetBrains IDEs
#
###################################
**/.idea
###################################
#
......@@ -505,7 +511,8 @@ $RECYCLE.BIN/
# Misc
#
###################################
# ignore catch.hpp header as it is downloaded automatically
**/catch.hpp
# ignore everything in the build directory
# (explicitly included here because separate .gitignore inside that directory)
......
......@@ -23,7 +23,7 @@ add_custom_command(
# the JSON library
download_project(PROJ json
GIT_REPOSITORY https://github.com/nlohmann/json.git
GIT_TAG master
GIT_TAG develop
UPDATE_DISCONNECTED 1
QUIET
)
......@@ -68,10 +68,92 @@ add_custom_command(
${SimpleWeb_SOURCE_DIR}
${THIRD_PARTY_DIR}/SimpleWeb)
#--------------------------------------------------------------------------------
# Google Benchmark framework
if (BUILD_GOOGLE_BENCH)
download_project(PROJ benchmark
GIT_REPOSITORY https://github.com/google/benchmark.git
GIT_TAG master
UPDATE_DISCONNECTED 1
QUIET
)
add_custom_command(
OUTPUT ${THIRD_PARTY_DIR}/benchmark
COMMAND ${CMAKE_COMMAND} -E chdir ${benchmark_SOURCE_DIR} cmake -DCMAKE_BUILD_TYPE=Release
COMMAND ${CMAKE_COMMAND} -E chdir ${benchmark_SOURCE_DIR} $(MAKE)
COMMAND ${CMAKE_COMMAND} -E make_directory ${THIRD_PARTY_DIR}/benchmark/include
COMMAND ${CMAKE_COMMAND} -E make_directory ${THIRD_PARTY_DIR}/benchmark/lib
COMMAND ${CMAKE_COMMAND} -E copy_directory
${benchmark_SOURCE_DIR}/include
${THIRD_PARTY_DIR}/benchmark/include
COMMAND ${CMAKE_COMMAND} -E copy
${benchmark_SOURCE_DIR}/src/libbenchmark.a
${THIRD_PARTY_DIR}/benchmark/lib
)
endif()
#--------------------------------------------------------------------------------
if(USE_ROCKSDB_TABLE)
# RocksDB key-value store
download_project(PROJ rocksdb
GIT_REPOSITORY https://github.com/facebook/rocksdb
GIT_TAG v5.1.4
UPDATE_DISCONNECTED 1
QUIET
)
add_custom_command(
OUTPUT ${THIRD_PARTY_DIR}/rocksdb
COMMAND ${CMAKE_COMMAND} -E chdir ${rocksdb_SOURCE_DIR} $(MAKE) static_lib
COMMAND ${CMAKE_COMMAND} -E make_directory ${THIRD_PARTY_DIR}/rocksdb/include
COMMAND ${CMAKE_COMMAND} -E make_directory ${THIRD_PARTY_DIR}/rocksdb/lib
COMMAND ${CMAKE_COMMAND} -E copy_directory
${rocksdb_SOURCE_DIR}/include
${THIRD_PARTY_DIR}/rocksdb/include
COMMAND ${CMAKE_COMMAND} -E copy
${rocksdb_SOURCE_DIR}/librocksdb.a
${THIRD_PARTY_DIR}/rocksdb/lib
)
endif()
#--------------------------------------------------------------------------------
if(BUILD_USE_CASES)
# data for use cases
download_project(PROJ data
GIT_REPOSITORY https://github.com/dbis-ilm/data.git
GIT_TAG master
UPDATE_DISCONNECTED 1
QUIET
)
file(COPY ${PROJECT_BINARY_DIR}/data-src/DEBS2017
DESTINATION ${THIRD_PARTY_DIR}
)
endif()
#--------------------------------------------------------------------------------
if(USE_NVML_TABLE)
# Non-Volatile Memory Library (pmem.io)
download_project(PROJ nvml
GIT_REPOSITORY https://github.com/pmem/nvml.git
GIT_TAG 1.3.1-rc2
UPDATE_DISCONNECTED 1
QUIET
)
add_custom_command(
OUTPUT ${THIRD_PARTY_DIR}/nvml
COMMAND ${CMAKE_COMMAND} -E chdir ${nvml_SOURCE_DIR} $(MAKE)
COMMAND ${CMAKE_COMMAND} -E chdir ${nvml_SOURCE_DIR} $(MAKE) install prefix=${THIRD_PARTY_DIR}/nvml
)
# PTable (internal gitlab project) for NVM
download_project(PROJ ptable
GIT_REPOSITORY https://dbgit.prakinf.tu-ilmenau.de/code/PTable.git
GIT_TAG master
UPDATE_DISCONNECTED 1
QUIET
)
add_custom_command(
OUTPUT ${THIRD_PARTY_DIR}/ptable
COMMAND ${CMAKE_COMMAND} -E chdir ${ptable_SOURCE_DIR} cmake -DPTABLE_DIR=${THIRD_PARTY_DIR}/ptable src
COMMAND ${CMAKE_COMMAND} -E chdir ${ptable_SOURCE_DIR} $(MAKE) install
)
endif()
......@@ -10,6 +10,7 @@ macro( build_executable arg )
pfabric_core
${Boost_SYSTEM_LIBRARY}
${ROCKSDB_LIB}
${NVML_LIBRARIES}
${BENCHMARK_LIB}
${MALLOC_LIB}
)
......
......@@ -56,7 +56,13 @@ option(USE_BOOST_SPIRIT_PARSER
# If switched to off, it will not be downloaded, saving initial building time.
option(USE_ROCKSDB_TABLE
"use RocksDB for implementing persistent tables"
ON
OFF
)
# Use Non-Volatile Memory Library for implementing tables
option(USE_NVML_TABLE
"use nvml for implementing persistent memory tables"
OFF
)
# Build only pipefabric libraries (cep and core)
......@@ -64,7 +70,7 @@ option(USE_ROCKSDB_TABLE
# Building test cases is independent from this.
option(BUILD_ONLY_LIBS
"build only the two pipefabric libraries"
OFF
ON
)
# Build test cases for pipefabric functionality
......@@ -184,6 +190,31 @@ else()
set (ROCKSDB_LIB "")
endif()
########################
# Non-Volatile Memory Library
########################
#
if (USE_NVML_TABLE)
message(STATUS "using NVML based persistent table")
add_definitions(-DUSE_NVML_TABLE)
set (NVML_LIBRARIES
"${THIRD_PARTY_DIR}/nvml/lib/libpmemblk.a"
"${THIRD_PARTY_DIR}/nvml/lib/libpmemlog.a"
"${THIRD_PARTY_DIR}/nvml/lib/libpmemobj.a"
"${THIRD_PARTY_DIR}/nvml/lib/libpmempool.a"
"${THIRD_PARTY_DIR}/nvml/lib/libpmem.a"
${DYLIB_LIBRARY}
"${THIRD_PARTY_DIR}/ptable/lib/libptable.so"
)
include_directories(
"${THIRD_PARTY_DIR}/nvml/include"
"${THIRD_PARTY_DIR}/ptable/include"
)
else ()
message(STATUS "don't use NVML based persistent table")
set (NVML_LIBRARIES "")
endif()
######################
# Boost C++ library
######################
......@@ -358,6 +389,7 @@ if(USE_KAFKA)
net/KafkaSource.cpp
)
endif()
#-----------------------------------------------------------------------------------------
#
######
......@@ -380,6 +412,19 @@ if(USE_MQTT)
net/MQTTSource.cpp
)
endif()
if(USE_NVML_TABLE)
add_definitions(-DDO_LOG=0)
set(core_sources
${core_sources}
${THIRD_PARTY_DIR}/nvml
${THIRD_PARTY_DIR}/ptable
)
set(core_libs
${core_libs}
${NVML_LIBRARIES}
)
endif()
#-----------------------------------------------------------------------------------------
add_library(pfabric_core SHARED
......@@ -408,7 +453,28 @@ target_link_libraries(pfabric_cep
#
# Experimental SQL query compiler
#
add_subdirectory(qcomp)
# if(!USE_NVML_TABLE)
#-----------------------------------------------------------------------------------------
#
# Building PipeFabric query compiler library
#
add_library(pfabric_qcomp SHARED
qcomp/Plan.cpp
qcomp/PlanCache.cpp
qcomp/QueryCompiler.cpp
qcomp/TypeManager.cpp
qcomp/UniqueNameGenerator.cpp
qcomp/SQLParser.cpp)
target_link_libraries(pfabric_qcomp
pfabric_core
${Boost_PROGRAM_OPTIONS_LIBRARY}
${Boost_FILESYSTEM_LIBRARY}
${BOOST_LIBRARIES}
)
add_subdirectory(qcomp)
# endif()
#-----------------------------------------------------------------------------------------
#
......
......@@ -18,8 +18,11 @@
* along with this program; see the file LICENSE.
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#include <string>
#include "PFabricContext.hpp"
#include <sstream>
#include "dsl/PFabricContext.hpp"
#include "dsl/Topology.hpp"
using namespace pfabric;
......@@ -41,8 +44,10 @@ TableInfoPtr PFabricContext::getTableInfo(const std::string& tblName) {
return it->second->tableInfo();
}
else {
// TODO: shouldn't we throw an exception here???
std::cout << "table not found: '" << tblName << "' : " << mTableSet.size() << std::endl;
return std::shared_ptr<TableInfo>();
// std::cout << "table not found: '" << tblName << "' : " << mTableSet.size() << std::endl;
//return std::shared_ptr<TableInfo>();
std::stringstream errMsg;
errMsg << "table not found: '" << tblName << "' : " << mTableSet.size() << '\n';
throw TableException(errMsg.str().c_str());
}
}
......@@ -21,10 +21,14 @@
#ifndef PFabricContext_hpp_
#define PFabricContext_hpp_
#include <string>
#include <iostream>
#include <map>
#include <memory>
#include <string>
#include "core/PFabricTypes.hpp"
#include "dsl/Dataflow.hpp"
#include "qop/Queue.hpp"
#include "table/Table.hpp"
#include "qop/Queue.hpp"
#include "dsl/Topology.hpp"
......@@ -88,7 +92,7 @@ public:
a pointer to the newly created table
*/
template <typename RecordType, typename KeyType = DefaultKeyType>
std::shared_ptr<Table<RecordType, KeyType>> createTable(const std::string& tblName) throw (TableException) {
std::shared_ptr<Table<RecordType, KeyType>> createTable(const std::string& tblName) noexcept(false) {
// first we check whether the table exists already
auto it = mTableSet.find(tblName);
if (it != mTableSet.end())
......@@ -101,7 +105,7 @@ public:
}
template <typename RecordType, typename KeyType = DefaultKeyType>
std::shared_ptr<Table<RecordType, KeyType>> createTable(const TableInfo& tblInfo) throw (TableException) {
std::shared_ptr<Table<RecordType, KeyType>> createTable(const TableInfo& tblInfo) noexcept(false) {
// first we check whether the table exists already
auto it = mTableSet.find(tblInfo.tableName());
if (it != mTableSet.end())
......@@ -207,12 +211,12 @@ public:
}
private:
typedef std::shared_ptr<BaseTable> BaseTablePtr;
using BaseTablePtr = typename std::shared_ptr<BaseTable>;
std::map<std::string, BaseTablePtr> mTableSet; //< a dictionary collecting all existing tables
std::map<std::string, Dataflow::BaseOpPtr> mStreamSet; //< a dictionary collecting all named streams
#ifdef SUPPORT_MATRICES
typedef std::shared_ptr<BaseMatrix> BaseMatrixPtr;
using BaseMatrixPtr = typename std::shared_ptr<BaseMatrix>;
std::map<std::string, BaseMatrixPtr> matrixMap; //< a dictionary collecting all existing matrix
#endif
};
......
......@@ -156,7 +156,7 @@ class Pipe {
Dataflow::BaseOpIterator getPublishers() { return tailIter; }
template <typename SourceType>
SourceType* castOperator(Dataflow::BaseOpPtr opPtr) throw(TopologyException) {
SourceType* castOperator(Dataflow::BaseOpPtr opPtr) noexcept(false) {
auto pOp = dynamic_cast<SourceType*>(opPtr.get());
if (pOp == nullptr) {
throw TopologyException("Incompatible tuple types in Pipe.");
......@@ -165,7 +165,7 @@ class Pipe {
}
template <typename SourceType>
SourceType* castOperator(BaseOp* opPtr) throw(TopologyException) {
SourceType* castOperator(BaseOp* opPtr) noexcept(false) {
auto pOp = dynamic_cast<SourceType*>(opPtr);
if (pOp == nullptr) {
throw TopologyException("Incompatible tuple types in Pipe.");
......@@ -174,8 +174,7 @@ class Pipe {
}
template <typename Publisher, typename SourceType>
OpIterator addPublisher(std::shared_ptr<Publisher> op) throw(
TopologyException) {
OpIterator addPublisher(std::shared_ptr<Publisher> op) noexcept(false) {
auto pOp = castOperator<SourceType>(getPublisher());
CREATE_LINK(pOp, op);
return dataflow->addPublisher(op);
......@@ -184,7 +183,7 @@ class Pipe {
template <typename T2, typename KeyType>
OpIterator addPartitionedJoin(
std::vector<std::shared_ptr<SHJoin<T, T2, KeyType>>>& opList, DataSource<T2>* otherOp,
PartitioningState otherPartitioningState) throw(TopologyException) {
PartitioningState otherPartitioningState) noexcept(false) {
typedef typename std::shared_ptr<SHJoin<T, T2, KeyType>> JoinOpPtr;
if (partitioningState == NoPartitioning)
throw TopologyException("Missing partitionBy operator in topology.");
......@@ -249,7 +248,7 @@ class Pipe {
template <typename Publisher, typename StreamElement>
OpIterator addPartitionedPublisher(std::vector<std::shared_ptr<Publisher>>&
opList) throw(TopologyException) {
opList) noexcept(false) {
if (partitioningState == NoPartitioning)
throw TopologyException("Missing partitionBy operator in topology.");
......@@ -398,7 +397,7 @@ class Pipe {
*/
Pipe<T> slidingWindow(const WindowParams::WinType& wt, const unsigned int sz,
typename Window<T>::WindowOpFunc windowFunc = nullptr,
const unsigned int ei = 0) throw(TableException) {
const unsigned int ei = 0) noexcept(false) {
typedef typename Window<T>::TimestampExtractorFunc ExtractorFunc;
ExtractorFunc fn;
......@@ -459,7 +458,7 @@ class Pipe {
*/
Pipe<T> tumblingWindow(const WindowParams::WinType& wt,
const unsigned int sz,
typename Window<T>::WindowOpFunc windowFunc = nullptr) throw(TableException) {
typename Window<T>::WindowOpFunc windowFunc = nullptr) noexcept(false) {
typedef typename Window<T>::TimestampExtractorFunc ExtractorFunc;
ExtractorFunc fn;
......@@ -519,7 +518,7 @@ class Pipe {
Pipe<T> print(
std::ostream& os = std::cout,
typename ConsoleWriter<T>::FormatterFunc ffun =
ConsoleWriter<T>::defaultFormatter) throw(TopologyException) {
ConsoleWriter<T>::defaultFormatter) noexcept(false) {
assert(partitioningState == NoPartitioning);
auto op = std::make_shared<ConsoleWriter<T>>(os, ffun);
auto pOp = castOperator<DataSource<T>>(getPublisher());
......@@ -548,7 +547,7 @@ class Pipe {
Pipe<T> saveToFile(
const std::string& fname,
typename FileWriter<T>::FormatterFunc ffun =
ConsoleWriter<T>::defaultFormatter) throw(TopologyException) {
ConsoleWriter<T>::defaultFormatter) noexcept(false) {
assert(partitioningState == NoPartitioning);
auto op = std::make_shared<FileWriter<T>>(fname, ffun);
auto pOp = castOperator<DataSource<T>>(getPublisher());
......@@ -579,7 +578,7 @@ class Pipe {
Pipe<T> sendZMQ(const std::string& path,
ZMQParams::SinkType stype = ZMQParams::PublisherSink,
ZMQParams::EncodingMode mode =
ZMQParams::BinaryMode) throw(TopologyException) {
ZMQParams::BinaryMode) noexcept(false) {
if (partitioningState == NoPartitioning) {
auto op = std::make_shared<ZMQSink<T>>(path, stype, mode);
auto pOp = castOperator<DataSource<T>>(getPublisher());
......@@ -614,7 +613,7 @@ class Pipe {
* @return a new pipe
*/
template <class Tout>
Pipe<Tout> extract(char sep) throw(TopologyException) {
Pipe<Tout> extract(char sep) noexcept(false) {
if (partitioningState == NoPartitioning) {
auto op = std::make_shared<TupleExtractor<Tout>>(sep);
auto iter =
......@@ -645,8 +644,7 @@ class Pipe {
* @return a new pipe
*/
template <class Tout>
Pipe<Tout> extractJson(const std::initializer_list<std::string>& keys) throw(
TopologyException) {
Pipe<Tout> extractJson(const std::initializer_list<std::string>& keys) noexcept(false) {
std::vector<std::string> keyList(keys);
if (partitioningState == NoPartitioning) {
auto op = std::make_shared<JsonExtractor<Tout>>(keyList);
......@@ -667,7 +665,7 @@ class Pipe {
/**
* TODO
*/
Pipe<BatchPtr<T>> batch(std::size_t bsize = SIZE_MAX) throw(TopologyException) {
Pipe<BatchPtr<T>> batch(std::size_t bsize = SIZE_MAX) noexcept(false) {
auto op = std::make_shared<Batcher<T>>(bsize);
auto iter = addPublisher<Batcher<T>, DataSource<T>>(op);
return Pipe<BatchPtr<T>>(dataflow, iter, keyExtractor, timestampExtractor, transactionIDExtractor,
......@@ -692,7 +690,7 @@ class Pipe {
* @return a new pipe
*/
template <class Tout>
Pipe<Tout> deserialize() throw(TopologyException) {
Pipe<Tout> deserialize() noexcept(false) {
if (partitioningState == NoPartitioning) {
auto op = std::make_shared<TupleDeserializer<Tout>>();
auto iter =
......@@ -727,8 +725,7 @@ class Pipe {
* value for the input tuple
* @return a new pipe
*/
Pipe<T> where(typename Where<T>::PredicateFunc func) throw(
TopologyException) {
Pipe<T> where(typename Where<T>::PredicateFunc func) noexcept(false) {
if (partitioningState == NoPartitioning) {
auto op = std::make_shared<Where<T>>(func);
auto iter = addPublisher<Where<T>, DataSource<T>>(op);
......@@ -766,7 +763,7 @@ class Pipe {
*/
Pipe<T> notify(typename Notify<T>::CallbackFunc func,
typename Notify<T>::PunctuationCallbackFunc pfunc =
nullptr) throw(TopologyException) {
nullptr) noexcept(false) {
assert(partitioningState == NoPartitioning);
auto op = std::make_shared<Notify<T>>(func, pfunc);
......@@ -787,7 +784,7 @@ class Pipe {
* the input tuple type (usually a TuplePtr) for the operator.
* @return a new pipe
*/
Pipe<T> queue() throw(TopologyException) {
Pipe<T> queue() noexcept(false) {
if (partitioningState == NoPartitioning) {
auto op = std::make_shared<Queue<T>>();
auto iter = addPublisher<Queue<T>, DataSource<T>>(op);
......@@ -817,7 +814,7 @@ class Pipe {
* the named stream object to which the tuples are sent
* @return a new pipe
*/
Pipe<T> toStream(Dataflow::BaseOpPtr stream) throw(TopologyException) {
Pipe<T> toStream(Dataflow::BaseOpPtr stream) noexcept(false) {
assert(partitioningState == NoPartitioning);
auto queueOp = castOperator<Queue<T>>(stream);
auto pOp = castOperator<DataSource<T>>(getPublisher());
......@@ -845,7 +842,7 @@ class Pipe {
* @return new pipe
*/
template <typename Tout>
Pipe<Tout> map(typename Map<T, Tout>::MapFunc func) throw(TopologyException) {
Pipe<Tout> map(typename Map<T, Tout>::MapFunc func) noexcept(false) {
if (partitioningState == NoPartitioning) {
auto op = std::make_shared<Map<T, Tout>>(func);
auto iter = addPublisher<Map<T, Tout>, DataSource<T>>(op);
......@@ -865,7 +862,7 @@ class Pipe {
template <typename Tout>
Pipe<Tout> tuplify(const std::initializer_list<std::string>& predList, TuplifierParams::TuplifyMode m,
unsigned int ws = 0) throw(TopologyException) {
unsigned int ws = 0) noexcept(false) {
if (partitioningState == NoPartitioning) {
auto op = std::make_shared<Tuplifier<T, Tout>>(predList, m, ws);
auto iter = addPublisher<Tuplifier<T, Tout>, DataSource<T>>(op);
......@@ -905,7 +902,7 @@ class Pipe {
*/
template <typename Tout, typename State>
Pipe<Tout> statefulMap(typename StatefulMap<T, Tout, State>::MapFunc
func) throw(TopologyException) {
func) noexcept(false) {
if (partitioningState == NoPartitioning) {
auto op = std::make_shared<StatefulMap<T, Tout, State>>(func);
auto iter = addPublisher<StatefulMap<T, Tout, State>, DataSource<T>>(op);
......@@ -958,7 +955,7 @@ class Pipe {
template <typename AggrState>
Pipe<typename AggrState::ResultTypePtr> aggregate(
AggregationTriggerType tType = TriggerAll,
const unsigned int tInterval = 0) throw(TopologyException) {
const unsigned int tInterval = 0) noexcept(false) {
static_assert(typename AggrStateTraits<AggrState>::type(), "aggregate requires an AggrState class");
return aggregate<typename AggrState::ResultTypePtr, AggrState>(AggrState::finalize, AggrState::iterate,
tType, tInterval);
......@@ -1012,7 +1009,7 @@ class Pipe {
typename Aggregation<T, Tout, AggrState>::FinalFunc finalFun,
typename Aggregation<T, Tout, AggrState>::IterateFunc iterFun,
AggregationTriggerType tType = TriggerAll,
const unsigned int tInterval = 0) throw(TopologyException) {
const unsigned int tInterval = 0) noexcept(false) {
if (partitioningState == NoPartitioning) {
auto op = std::make_shared<Aggregation<T, Tout, AggrState>>(
finalFun, iterFun, tType, tInterval);
......@@ -1060,7 +1057,7 @@ class Pipe {
typename KeyType = DefaultKeyType>
Pipe<typename AggrState::ResultTypePtr> groupBy(
AggregationTriggerType tType = TriggerAll,
const unsigned int tInterval = 0) throw(TopologyException) {
const unsigned int tInterval = 0) noexcept(false) {
static_assert(typename AggrStateTraits<AggrState>::type(), "groupBy requires an AggrState class");
return groupBy<typename AggrState::ResultTypePtr, AggrState, KeyType>(
AggrState::finalize, AggrState::iterate, tType, tInterval);
......@@ -1107,7 +1104,7 @@ class Pipe {
typename GroupedAggregation<T, Tout, AggrState, KeyType>::IterateFunc
iterFun,
AggregationTriggerType tType = TriggerAll,
const unsigned int tInterval = 0) throw(TopologyException) {
const unsigned int tInterval = 0) noexcept(false) {
try {
typedef std::function<KeyType(const T&)> KeyExtractorFunc;
KeyExtractorFunc keyFunc =
......@@ -1166,7 +1163,7 @@ class Pipe {
template <typename Tout, typename RelatedValueType>
Pipe<Tout> matchByNFA(
typename NFAController<T, Tout, RelatedValueType>::NFAControllerPtr
nfa) throw(TopologyException) {
nfa) noexcept(false) {
auto op = std::make_shared<Matcher<T, Tout, RelatedValueType>>(
Matcher<T, Tout, RelatedValueType>::FirstMatch);
op->setNFAController(nfa);
......@@ -1197,8 +1194,7 @@ class Pipe {
* @return a reference to the pipe
*/
template <typename Tout, typename RelatedValueType>
Pipe<Tout> matcher(CEPState<T, RelatedValueType>& expr) throw(
TopologyException) {
Pipe<Tout> matcher(CEPState<T, RelatedValueType>& expr) noexcept(false) {
assert(partitioningState == NoPartitioning);
auto op = std::make_shared<Matcher<T, Tout, RelatedValueType>>(
Matcher<T, Tout, RelatedValueType>::FirstMatch);
......@@ -1237,7 +1233,7 @@ class Pipe {
template <typename KeyType = DefaultKeyType, typename T2>
Pipe<typename SHJoin<T, T2, KeyType>::ResultElement> join(
Pipe<T2>& otherPipe, typename SHJoin<T, T2, KeyType>::JoinPredicateFunc
pred) throw(TopologyException) {
pred) noexcept(false) {
typedef typename SHJoin<T, T2, KeyType>::ResultElement Tout;
try {
typedef std::function<KeyType(const T&)> LKeyExtractorFunc;
......@@ -1428,7 +1424,7 @@ class Pipe {
template <typename KeyType = DefaultKeyType>
Pipe<T> toTable(std::shared_ptr<Table<typename T::element_type, KeyType>> tbl,
bool autoCommit = true) throw(TopologyException) {
bool autoCommit = true) noexcept(false) {
typedef std::function<KeyType(const T&)> KeyExtractorFunc;
assert(partitioningState == NoPartitioning);
......@@ -1477,10 +1473,9 @@ class Pipe {
Pipe<T> updateTable(
std::shared_ptr<Table<typename RecordType::element_type, KeyType>> tbl,
std::function<bool(const T&, bool,
typename RecordType::element_type&)>
updateFunc,
typename RecordType::element_type&)> updateFunc,
std::function<typename RecordType::element_type(const T&)> insertFunc
) throw(TopologyException) {
) noexcept(false) {
typedef std::function<KeyType(const T&)> KeyExtractorFunc;
assert(partitioningState == NoPartitioning);
......@@ -1597,7 +1592,7 @@ class Pipe {
* @return a new pipe
*/
Pipe<T> partitionBy(typename PartitionBy<T>::PartitionFunc pFun,
unsigned int nPartitions) throw(TopologyException) {
unsigned int nPartitions) noexcept(false) {
if (partitioningState != NoPartitioning)
throw TopologyException(
"Cannot partition an already partitioned stream.");
......@@ -1618,7 +1613,7 @@ class Pipe {
* the data stream element type consumed by PartitionBy
* @return a new pipe
*/
Pipe<T> merge() throw(TopologyException) {
Pipe<T> merge() noexcept(false) {
if (partitioningState != NextInPartitioning)
throw TopologyException("Nothing to merge in topology.");
......@@ -1657,7 +1652,7 @@ class Pipe {
*/
Pipe<T> barrier(
std::condition_variable& cVar, std::mutex& mtx,
typename Barrier<T>::PredicateFunc f) throw(TopologyException) {
typename Barrier<T>::PredicateFunc f) noexcept(false) {
if (partitioningState == NoPartitioning) {
auto op = std::make_shared<Barrier<T>>(cVar, mtx, f);
auto iter = addPublisher<Barrier<T>, DataSource<T>>(op);
......
......@@ -342,7 +342,7 @@ namespace pfabric {