Commit 7f40cecd authored by Kai-Uwe Sattler's avatar Kai-Uwe Sattler
Browse files

transaction supported added

parents ce9fc092 767e201b
...@@ -22,3 +22,4 @@ operators and utility classes. It consists of the following main components: ...@@ -22,3 +22,4 @@ operators and utility classes. It consists of the following main components:
+ [Tutorial: Stream partitioning](/documentation/Partitioning.md) + [Tutorial: Stream partitioning](/documentation/Partitioning.md)
+ [Tutorial: How to build and use a Docker image](/documentation/Docker.md) + [Tutorial: How to build and use a Docker image](/documentation/Docker.md)
+ [PipeFabric Use Cases](/documentation/UseCases.md) + [PipeFabric Use Cases](/documentation/UseCases.md)
+ [Additional network sources](/documentation/Network.md)
## Additional network sources ##
There already exist ZeroMQ and REST as network sources, providing tuples via network connection.
In addition, the AMQP (Advanced Message Queuing Protocol) used by RabbitMQ as well as the Apache
Kafka and the MQTT (Message Queue Telemetry Transport) protocol can be used as source. However,
there are additional libraries/installs necessary to run the protocols which are not delivered in
PipeFabric per default.
## RabbitMQ ##
### Preliminaries and Installation ###
For AMQP (RabbitMQ):
+ [RabbitMQ Server](https://www.rabbitmq.com/download.html)
+ [Amqpcpp library](https://github.com/akalend/amqpcpp), available on GitHub
The server is necessary for realization of AMQP, while the Amqpcpp library allows using the server
within the C++ language. On Linux side, the server is usually inside of the standard repositories,
so you can easily install it with the command `sudo apt-get install rabbitmq-server`.
After the RabbitMQ server and the Amqpcpp library are installed and added to your path (in a way
that CMake can find them), you have to enable the RabbitMQ source for PipeFabric by switching the
CMake variable `USE_RABBITMQ` to `ON`. This can be done manually in the CMakeLists.txt file in the
src folder or by passing `-DUSE_RABBITMQ=ON` to cmake, like `cmake -DUSE_RABBITMQ=ON ../src`.
In addition, you have to start the RabbitMQ server before running the test case. This can be done
on console by the command `service rabbitmq-server start`. Else the test case will throw an error,
namely `AMQP cannot create socket`.
### Usage ###
PipeFabric provides an interface to the RabbitMQ server in which all currently available messages
on the server are gathered, transformed to tuples and forwarded to the query. This is done by using
the operator `newStreamFromRabbitMQ`:
`Pipe<TStringPtr> newStreamFromRabbitMQ(const std::string& info, const std::string& queueName)`
Each incoming message of the RabbitMQ service produces a single tuple (consisting of a single
string). The parameter `info` declares the connection of the server. Usually when the RabbitMQ
server is started without modifications, a user named "guest" with the password "guest" is applied
to the system. The `info` parameter can then be entered as `guest:guest@localhost:5672`, where 5672
is the used port. The parameter `queueName` describes the queue, where messages are exchanged.
The operator currently checks once if there are messages (tuples) available on the RabbitMQ server.
If yes, all the messages are gathered and sent downstreams to the subscribers (that means, the
following operator(s)). Then it finishes. However, it can be easily adapted to stay waiting,
repeatedly asking the server if new messages have arrived.
There is an example (test case) provided how to use it properly which can be found in
`/test/RabbitMQSourceTest.cpp` of the source folder.
## Apache Kafka ##
### Preliminaries and Installation ###
For Apache Kafka:
+ [Apache Zookeeper](https://zookeeper.apache.org/)
+ [Apache Kafka server](https://kafka.apache.org/downloads)
+ [Librdkafka](https://github.com/edenhill/librdkafka), C++ Kafka library, available on GitHub
+ [Cppkafka](https://github.com/mfontanini/cppkafka), C++ wrapper around Librdkafka, available on GitHub
The Kafka server is used for exchanging messages and uses Apache Zookeeper as dependency. On most
Linux systems, the Zookeeper is available in the standard repository, so it is possible to use the
command `sudo apt-get install zookeeperd` for installing. For setting up the Kafka server on Linux
inside your home directory, you can simply do the following on the command line (using wget):
```
$ mkdir -p ~/kafka
$ cd ~/kafka
$ wget http://www-us.apache.org/dist/kafka/0.10.2.0/kafka_2.10-0.10.2.0.tgz
$ tar xvzf kafka_2.10-0.10.2.0.tgz --strip 1
$ ./bin/kafka-server-start.sh ~/kafka/config/server.properties
```
For deleting topics in Apache Kafka, you should edit the server properties located in
`~/kafka/config/server.properties`, removing the `#` in the line `#delete.topic.enable=true`.
The library `librdkafka` provides support for C++ when using Apache Kafka. The wrapper `cppkafka`
uses the library, providing a much more userfriendly utilization. Both have to be installed in a
way that CMake can find them (libraries and headers).
Apache Kafka is then enabled in PipeFabric by switching the CMake variable `USE_KAFKA` to `ON`.
This can be done manually in the CMakeLists.txt file in the src folder or by passing
`-DUSE_KAFKA=ON` to cmake, like `cmake -DUSE_KAFKA=ON ../src`.
In addition, you have to start the Kafka server before running the test case. This can be done
on console inside the Kafka folder by the command
`./bin/kafka-server-start.sh ./config/server.properties`. Else the test case will throw an error,
namely `Connection refused - brokers are down`.
### Usage ###
PipeFabric provides an interface to the Kafka server in which all currently available messages
on the server are gathered, transformed to tuples and forwarded to the query. This is done by using
the operator `newStreamFromKafka`:
`Pipe<TStringPtr> Topology::newStreamFromKafka(const std::string& broker, const std::string& topic, const std::string& groupID)`
Each incoming message of the Kafka server produces a single tuple (consisting of a single string).
The parameter `broker` describes a cluster instance on the server, possible to use your localhost.
The `topic` is the topic on which the data is exchanged, respectively tuples. Finally, the
`groupID` of the consumer describes to which group (of producers and consumers) it belongs to.
Kafka automatically destroys groups that have no members left.
The operator currently checks once if there are messages (tuples) available on the Kafka server.
If yes, all the messages are consecutively gathered and sent downstreams to the subscribers (that
means, the following operator(s)). Then it finishes. However, it can be easily adapted to stay
waiting, repeatedly asking the server if new messages have arrived.
There is an example (test case) provided how to use it properly which can be found in
`/test/KafkaSourceTest.cpp` of the source folder.
## MQTT ##
### Preliminaries and Installation ###
For MQTT:
+ [Eclipse Mosquitto](https://mosquitto.org/)
+ [Eclipse Paho C](https://github.com/eclipse/paho.mqtt.c.git), available on Github
+ [Eclipse Paho C++](https://github.com/eclipse/paho.mqtt.cpp.git), available on GitHub
The Eclipse Mosquitto delivers the necessities for running an MQTT server. In Linux systems, it is
possible to use the command `sudo apt-get install mosquitto mosquitto-clients` for the server and
the client software. Eclipse Paho provides the libraries and headers for C++ support. Because of
Eclipse Paho C++ references the Eclipse Paho C installation, both are necessary. Both have to be
installed in a way that CMake can find them (libraries and headers).
MQTT is then enabled in PipeFabric by switching the CMake variable `USE_MQTT` to `ON`. This can be
done manually in the CMakeLists.txt file in the src folder or by passing `-DUSE_MQTT=ON` to cmake,
like `cmake -DUSE_MQTT=ON ../src`.
In addition, you have to start the MQTT server before running the test case. This can be done on
console by the command `mosquitto`. Else the test case will throw an error, namely
`MQTT error [-1]: TCP/TLS connect failure`.
### Usage ###
PipeFabric provides an interface to the MQTT server in which all currently available messages on
the server are gathered, transformed to tuples and forwarded to the query. This is done by using
the operator `newStreamFromMQTT`:
`Pipe<TStringPtr> Topology::newStreamFromMQTT(const std::string& conn, const std::string& channel)`
Each incoming message of the MQTT server produces a single tuple (consisting of a single string).
The parameter `conn` describes the server address with port. The channel is just the name of the
topic where the messages are exchanged.
The operator currently checks once if there are messages (tuples) available on the MQTT server.
If yes, all the messages are consecutively gathered and sent downstreams to the subscribers (that
means, the following operator(s)). Then it finishes. However, it can be easily adapted to stay
waiting, repeatedly asking the server if new messages have arrived.
There is an example (test case) provided how to use it properly which can be found in
`/test/MQTTSourceTest.cpp` of the source folder.
...@@ -15,18 +15,36 @@ include(CTest) ...@@ -15,18 +15,36 @@ include(CTest)
# Installation path # Installation path
set(PIPEFABRIC_DIR "/usr/local/pfabric") set(PIPEFABRIC_DIR "/usr/local/pfabric")
# The following variables enable or disable additional functionalities, # The following variables enable or disable additional functionalities,
# which can be switched off to reduce build time. # which can be switched off to reduce build time.
# Support Matrix Operations (needs Eigen library to be installed) # Support Matrix Operations (needs Eigen library to be installed)
option(SUPPORT_MATRICES option(SUPPORT_MATRICES
"support matrix operations as tuple and state type" "support matrix operations as tuple and state type"
OFF) OFF
)
# Build use cases # Build use cases
option(BUILD_USE_CASES option(BUILD_USE_CASES
"" "build use cases to show functionality examples"
ON) OFF
)
# Use RabbitMQ as network source
option(USE_RABBITMQ
"use RabbitMQ as network source"
OFF
)
# Use Apache Kafka as network source
option(USE_KAFKA
"use Apache Kafka as network source"
OFF
)
# Use MQTT as network source
option(USE_MQTT
"use MQTT as network source"
OFF
)
# Use the boost::spirit parser for converting strings to numbers # Use the boost::spirit parser for converting strings to numbers
option(USE_BOOST_SPIRIT_PARSER option(USE_BOOST_SPIRIT_PARSER
...@@ -53,13 +71,13 @@ option(BUILD_ONLY_LIBS ...@@ -53,13 +71,13 @@ option(BUILD_ONLY_LIBS
# If switched to off, no tests will be build # If switched to off, no tests will be build
option(BUILD_TEST_CASES option(BUILD_TEST_CASES
"build tests for pipefabric functionality" "build tests for pipefabric functionality"
ON ON
) )
#Build google benchmark library #Build google benchmark library
option(BUILD_GOOGLE_BENCH option(BUILD_GOOGLE_BENCH
"build google benchmark" "build google benchmark"
ON ON
) )
# Build benchmark test # Build benchmark test
...@@ -88,7 +106,7 @@ endif() ...@@ -88,7 +106,7 @@ endif()
#CMAKE_FORCE_CXX_COMPILER(icpc "Intel C++ Compiler") #CMAKE_FORCE_CXX_COMPILER(icpc "Intel C++ Compiler")
# C++ compiler flags # C++ compiler flags
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14 -Wall -Wno-deprecated -g -O3 -Wsign-compare") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14 -Wall -Wno-deprecated -g -O1 -Wsign-compare")
if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang") if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-local-typedefs -Wno-#pragma-messages") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-local-typedefs -Wno-#pragma-messages")
elseif("${CMAKE_CXX_COMPILER_ID}" MATCHES "GNU") elseif("${CMAKE_CXX_COMPILER_ID}" MATCHES "GNU")
...@@ -141,7 +159,7 @@ else() ...@@ -141,7 +159,7 @@ else()
endif() endif()
######################## ########################
# RocksDB database library # RocksDB database library
######################## ########################
# #
if (USE_ROCKSDB_TABLE) if (USE_ROCKSDB_TABLE)
...@@ -237,7 +255,7 @@ if(SUPPORT_MATRICES) ...@@ -237,7 +255,7 @@ if(SUPPORT_MATRICES)
#################################### ####################################
find_package(Eigen3) find_package(Eigen3)
if(EIGEN3_FOUND) if(EIGEN3_FOUND)
message(STATUS "using Eigen3 library for linear algebra") message(STATUS "using Eigen3 library for linear algebra")
include_directories(${EIGEN3_INCLUDE_DIR}) include_directories(${EIGEN3_INCLUDE_DIR})
else() else()
message(STATUS "Eigen3 not found") message(STATUS "Eigen3 not found")
...@@ -245,7 +263,6 @@ if(SUPPORT_MATRICES) ...@@ -245,7 +263,6 @@ if(SUPPORT_MATRICES)
add_definitions(-DSUPPORT_MATRICES) add_definitions(-DSUPPORT_MATRICES)
endif() endif()
#----------------------------------------------------------------------------------------- #-----------------------------------------------------------------------------------------
# #
# Building PipeFabric core library # Building PipeFabric core library
...@@ -298,13 +315,79 @@ if(BUILD_GOOGLE_BENCH) ...@@ -298,13 +315,79 @@ if(BUILD_GOOGLE_BENCH)
) )
endif() endif()
set(core_libs
${BOOST_LIBRARIES}
${ZEROMQ_LIBRARIES}
)
#-----------------------------------------------------------------------------------------
#
##########
# RabbitMQ
##########
#
#
if(USE_RABBITMQ)
add_definitions(-DUSE_RABBITMQ)
set(core_libs
${core_libs}
amqpcpp
rabbitmq
)
set(core_sources
${core_sources}
net/RabbitMQSource.cpp
)
endif()
#-----------------------------------------------------------------------------------------
#
##############
# Apache Kafka
##############
#
#
if(USE_KAFKA)
add_definitions(-DUSE_KAFKA)
set(core_libs
${core_libs}
cppkafka
rdkafka
)
set(core_sources
${core_sources}
net/KafkaSource.cpp
)
endif()
#-----------------------------------------------------------------------------------------
#
######
# MQTT
######
#
#
if(USE_MQTT)
add_definitions(-DUSE_MQTT)
set(core_libs
${core_libs}
paho-mqtt3c
paho-mqtt3cs
paho-mqtt3a
paho-mqtt3as
paho-mqttpp3
)
set(core_sources
${core_sources}
net/MQTTSource.cpp
)
endif()
#-----------------------------------------------------------------------------------------
add_library(pfabric_core SHARED add_library(pfabric_core SHARED
${core_sources} ${core_sources}
) )
target_link_libraries(pfabric_core target_link_libraries(pfabric_core
${BOOST_LIBRARIES} ${core_libs}
${ZEROMQ_LIBRARIES}
) )
#----------------------------------------------------------------------------------------- #-----------------------------------------------------------------------------------------
...@@ -359,10 +442,10 @@ if(BUILD_USE_CASES) ...@@ -359,10 +442,10 @@ if(BUILD_USE_CASES)
####################################### #######################################
find_package(OpenCV REQUIRED) find_package(OpenCV REQUIRED)
if(OpenCV_FOUND) if(OpenCV_FOUND)
message(STATUS "using OpenCV library for image processing") message(STATUS "using OpenCV library for image processing")
include_directories(${OpenCV_INCLUDE_DIR}) include_directories(${OpenCV_INCLUDE_DIR})
else() else()
message(STATUS "OpenCV not found") message(STATUS "OpenCV not found")
endif() endif()
endif() endif()
add_subdirectory(usecases) add_subdirectory(usecases)
......
...@@ -98,11 +98,48 @@ Pipe<TStringPtr> Topology::newStreamFromFile(const std::string& fname, unsigned ...@@ -98,11 +98,48 @@ Pipe<TStringPtr> Topology::newStreamFromFile(const std::string& fname, unsigned
return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op)); return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
} }
#ifdef USE_RABBITMQ
Pipe<TStringPtr> Topology::newStreamFromRabbitMQ(const std::string& info, const std::string& queueName) {
// create a new RabbitMQSource
auto op = std::make_shared<RabbitMQSource>(info, queueName);
// register it's start function
registerStartupFunction(std::bind(&RabbitMQSource::start, op.get()));
// and create a new pipe; we use a raw pointer here because
// we want to return a reference to a Pipe object
return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
}
#endif
#ifdef USE_KAFKA
Pipe<TStringPtr> Topology::newStreamFromKafka(const std::string& broker, const std::string& topic,
const std::string& groupID) {
// create a new KafkaSource
auto op = std::make_shared<KafkaSource>(broker, topic, groupID);
// register it's start function
registerStartupFunction(std::bind(&KafkaSource::start, op.get()));
// and create a new pipe; we use a raw pointer here because
// we want to return a reference to a Pipe object
return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
}
#endif
#ifdef USE_MQTT
Pipe<TStringPtr> Topology::newStreamFromMQTT(const std::string& conn, const std::string& channel) {
// create a new MQTTSource
auto op = std::make_shared<MQTTSource>(conn, channel);
// register it's start function
registerStartupFunction(std::bind(&MQTTSource::start, op.get()));
// and create a new pipe; we use a raw pointer here because
// we want to return a reference to a Pipe object
return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
}
#endif
Pipe<TStringPtr> Topology::newStreamFromREST(unsigned int port, Pipe<TStringPtr> Topology::newStreamFromREST(unsigned int port,
const std::string& path, const std::string& path,
RESTSource::RESTMethod method, RESTSource::RESTMethod method,
unsigned short numThreads) { unsigned short numThreads) {
// create a new TextFileSource // create a new RESTSource
auto op = std::make_shared<RESTSource>(port, path, method, numThreads); auto op = std::make_shared<RESTSource>(port, path, method, numThreads);
// register it's start function // register it's start function
registerStartupFunction(std::bind(&RESTSource::start, op.get())); registerStartupFunction(std::bind(&RESTSource::start, op.get()));
......
...@@ -42,10 +42,19 @@ ...@@ -42,10 +42,19 @@
#include "qop/SelectFromTable.hpp" #include "qop/SelectFromTable.hpp"
#include "qop/StreamGenerator.hpp" #include "qop/StreamGenerator.hpp"
#ifdef SUPPORT_MATRICES #ifdef SUPPORT_MATRICES
#include "qop/FromMatrix.hpp" #include "qop/FromMatrix.hpp"
#endif #endif
#include "dsl/Pipe.hpp" #include "dsl/Pipe.hpp"
#include "dsl/Dataflow.hpp" #include "dsl/Dataflow.hpp"
#ifdef USE_RABBITMQ
#include "net/RabbitMQSource.hpp"
#endif
#ifdef USE_KAFKA
#include "net/KafkaSource.hpp"
#endif
#ifdef USE_MQTT
#include "net/MQTTSource.hpp"
#endif
namespace pfabric { namespace pfabric {
...@@ -198,6 +207,61 @@ namespace pfabric { ...@@ -198,6 +207,61 @@ namespace pfabric {
RESTSource::RESTMethod method, RESTSource::RESTMethod method,
unsigned short numThreads = 1); unsigned short numThreads = 1);
#ifdef USE_RABBITMQ
/**
* @brief Creates a pipe from a RabbitMQ source as input.
*
* Creates a new pipe for receiving tuples via AMQP server (RabbitMQ).
* It reads messages from the AMQP queue and forwards them as tuples
* to the subscribers, as long as there are messages on the server.
*
* @param[in] info
* a string containing password, user, address and port of the server
* format: "password:user@address:port", e.g. "guest:guest@localhost:5672"
* @param[in] queueName
* a string containing the name of the queue for exchanging tuples, e.g. "queue"
* @return
* a new pipe where RabbitMQSource acts as a producer.
*/
Pipe<TStringPtr> newStreamFromRabbitMQ(const std::string& info, const std::string& queueName);
#endif
#ifdef USE_KAFKA
/**
* @brief Creates a pipe from an Apache Kafka source as input.
*
* Creates a new pipe for receiving tuples via Apache Kafka protocol.
*
* @param[in] broker
* the node(s) where the Kafka server runs on,
* e.g. "127.0.0.1:9092" for localhost
* @param[in] topic
* the topic where the data is stored (Kafka property)
* @param[in] groupID
* the ID of the group the consumer belongs to
* @return
* a new pipe where KafkaSource acts as a producer.
*/
Pipe<TStringPtr> newStreamFromKafka(const std::string& broker, const std::string& topic,
const std::string& groupID);
#endif
#ifdef USE_MQTT
/**
* @brief Creates a pipe from a MQTT source as input.
*
* Creates a new pipe for receiving tuples via MQTT.
*
* @param[in] conn
* server connection info, e.g. "tcp://localhost:1883"
* @param[in] channel
* the name of the channel to listen on
* @return
* a new pipe where MQTTSource acts as a producer.
*/
Pipe<TStringPtr> newStreamFromMQTT(const std::string& conn, const std::string& channel);
#endif
/** /**
* @brief Creates a pipe from a ZMQ source as input. * @brief Creates a pipe from a ZMQ source as input.
* *
......
/*
* Copyright (c) 2014-17 The PipeFabric team,
* All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License (GPL) as
* published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; see the file LICENSE.
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#include "cppkafka/consumer.h"
#include "cppkafka/configuration.h"
#include "KafkaSource.hpp"
using namespace pfabric;
KafkaSource::KafkaSource(const std::string& broker, const std::string& topic,
const std::string& groupID) {
//create config file, just necessary to start the consumer
cppkafka::Configuration config = {
{ "metadata.broker.list", broker },
{ "group.id", groupID },
{ "enable.auto.commit", false }
};
consumer = new cppkafka::Consumer(config);
consumer->subscribe({ topic });
//(!) without polling once, we will not get the current position of data in the topic
msg = consumer->poll();
}
KafkaSource::~KafkaSource() {
}
unsigned long KafkaSource::start() {
//refresh (poll)
msg = consumer->poll();
//as long as there are messages - also possible to use "while(true)" to stay connected
//getting messages sent later on, but for the test case we have to finish some time
while(msg) {
//no error handling currently, but there is also the error "eof" from Kafka that is
//not necessary to handle here
if (!msg.get_error()) {
produceTuple(StringRef((char*)msg.get_payload().get_data(),
msg.get_payload().get_size()));
consumer->commit(msg);
}
//check if there are more messages waiting
msg = consumer->poll();
}
delete consumer;
return 0;
}
void KafkaSource::stop() {
}
void KafkaSource::produceTuple(const StringRef& data) {
auto tn = makeTuplePtr(data);
this->getOutputDataChannel().publish(tn, false);
}
void KafkaSource::producePunctuation(PunctuationPtr pp) {
this->getOutputPunctuationChannel().publish(pp);
}
/*
* Copyright (c) 2014-17 The PipeFabric team,
* All Rights Reserved.
*
* This file is part of the PipeFabric package.
*