Commit 58d6a998 authored by Constantin Pohl's avatar Constantin Pohl
Browse files

Added optional Apache Kafka support, minor fixes

parent 2ee2ad8a
## Additional network sources ##
There already exist ZeroMQ and REST as network sources, providing tuples via network connection.
In addition, the AMQP (Advanced Message Queuing Protocol) used by RabbitMQ can be used as source.
However, there are two additional libraries/installs necessary to run the protocol which are not
delivered in PipeFabric per default:
There already exist ZeroMQ and REST as network sources, providing tuples via network connection.
In addition, the AMQP (Advanced Message Queuing Protocol) used by RabbitMQ as well as the Apache
Kafka protocol can be used as source. However, there are additional libraries/installs necessary to
run the protocol which are not delivered in PipeFabric per default.
## RabbitMQ ##
### Preliminaries and Installation ###
For AMQP (RabbitMQ):
+ [RabbitMQ Server](https://www.rabbitmq.com/download.html)
+ [Amqpcpp library](https://github.com/akalend/amqpcpp), available on GitHub
The server is necessary for realization of AMQP, while the Amqpcpp library allows using the server
within the C++ language. On Linux side, the server is usually inside of the standard repositories,
The server is necessary for realization of AMQP, while the Amqpcpp library allows using the server
within the C++ language. On Linux side, the server is usually inside of the standard repositories,
so you can easily install it with the command `sudo apt-get install rabbitmq-server`.
After the RabbitMQ server and the Amqpcpp library are installed and added to your path (in a way
that CMake can find them), you have to enable the RabbitMQ source for PipeFabric by switching the
CMake variable `USE_RABBITMQ` to `ON`. This can be done manually in the CMakeLists.txt file in the
src folder or by passing `-DUSE_RABBITMQ=ON` to cmake, like `cmake -DUSE_RABBITMQ=ON ../src`.
### Installation ###
In addition, you have to start the RabbitMQ server before running the test case. This can be done
on console by the command `service rabbitmq-server start`. Else the test case will throw an error,
namely `AMQP cannot create socket`.
After the RabbitMQ server and the Amqpcpp library are installed and added to your path (in a way
that CMake can find them), you have to enable the network source by switching the CMake variable
`NETWORK_SOURCES` to `ON`. This can be done manually in the CMakeLists.txt file in the src folder
or by passing `-DNETWORK_SOURCES=ON` to cmake, like `cmake -DNETWORK_SOURCES=ON ../src`.
### Usage ###
In addition, you have to start the RabbitMQ server before running the test case. This can be done
on console by the command `service rabbitmq-server start`. Else the test case will throw an error,
namely `AMQP cannot create socket`.
PipeFabric provides an interface to the RabbitMQ server in which all currently available messages
on the server are gathered, transformed to tuples and forwarded to the query. This is done by using
the operator `newStreamFromRabbitMQ`:
`Pipe<TStringPtr> newStreamFromRabbitMQ(const std::string& info, const std::string& queueName)`
Each incoming message of the RabbitMQ service produces a single tuple (consisting of a single
string). The parameter `info` declares the connection of the server. Usually when the RabbitMQ
server is started without modifications, a user named "guest" with the password "guest" is applied
to the system. The `info` parameter can then be entered as `guest:guest@localhost:5672`, where 5672
is the used port. The parameter `queueName` describes the queue, where messages are exchanged.
The operator currently checks once if there are messages (tuples) available on the RabbitMQ server.
If yes, all the messages are gathered and sent downstreams to the subscribers (that means, the
following operator(s)). Then it finishes. However, it can be easily adapted to stay waiting,
repeatedly asking the server if new messages have arrived.
## Apache Kafka ##
### Preliminaries and Installation ###
For Apache Kafka:
+ [Apache Zookeeper](https://zookeeper.apache.org/)
+ [Apache Kafka server](https://kafka.apache.org/downloads)
+ [Librdkafka](https://github.com/edenhill/librdkafka), C++ Kafka library, available on GitHub
+ [Cppkafka](https://github.com/mfontanini/cppkafka), C++ wrapper around Librdkafka, available on GitHub
The Kafka server is used for exchanging messages and uses Apache Zookeeper as dependency. On most
Linux systems, the Zookeeper is available in the standard repository, so it is possible to use the
command `sudo apt-get install zookeeperd` for installing. For setting up the Kafka server on Linux
inside your home directory, you can simply do the following on the command line (using wget):
```
$ mkdir -p ~/kafka
$ cd ~/kafka
$ wget http://www-us.apache.org/dist/kafka/0.10.2.0/kafka_2.10-0.10.2.0.tgz
$ tar xvzf kafka_2.10-0.10.2.0.tgz --strip 1
$ ./bin/kafka-server-start.sh ~/kafka/config/server.properties
```
For deleting topics in Apache Kafka, you should edit the server properties located in
`~/kafka/config/server.properties`, removing the `#` in the line `#delete.topic.enable=true`.
The library `librdkafka` provides support for C++ when using Apache Kafka. The wrapper `cppkafka`
uses the library, providing a much more userfriendly utilization. Both have to be installed in a
way that CMake can find them (libraries and headers).
Apache Kafka is then enabled in PipeFabric by switching the CMake variable `USE_KAFKA` to `ON`.
This can be done manually in the CMakeLists.txt file in the src folder or by passing
`-DUSE_KAFKA=ON` to cmake, like `cmake -DUSE_KAFKA=ON ../src`.
In addition, you have to start the Kafka server before running the test case. This can be done
on console inside the Kafka folder by the command
`./bin/kafka-server-start.sh ./config/server.properties`. Else the test case will throw an error,
namely `Connection refused - brokers are down`.
### Usage ###
PipeFabric provides an interface to the Kafka server in which all currently available messages
on the server are gathered, transformed to tuples and forwarded to the query. This is done by using
the operator `newStreamFromKafka`:
#### newStreamFromRabbitMQ ####
`Pipe<TStringPtr> Topology::newStreamFromKafka(const std::string& broker, const std::string& topic, const std::string& groupID)`
`Pipe<TStringPtr> Topology::newStreamFromRabbitMQ(const std::string& info)`
Each incoming message of the Kafka server produces a single tuple (consisting of a single string).
The parameter `broker` describes a cluster instance on the server, possible to use your localhost.
The `topic` is the topic on which the data is exchanged, respectively tuples. Finally, the
`groupID` of the consumer describes to which group (of producers and consumers) it belongs to.
Kafka automatically destroys groups that have no members left.
This is an operator for receiving tuples via RabbitMQ. Each incoming message of the RabbitMQ
service produces a single tuple (consisting of a single string). The parameter `info` declares the
connection of the server. Usually when the RabbitMQ server is started without modifications, a
user named "guest" with the password "guest" is applied to the system. The `info` parameter can
then be entered as `guest:guest@localhost:5672`, where 5672 is the used port.
The operator currently checks once if there are messages (tuples) available on the Kafka server.
If yes, all the messages are consecutively gathered and sent downstreams to the subscribers (that
means, the following operator(s)). Then it finishes. However, it can be easily adapted to stay
waiting, repeatedly asking the server if new messages have arrived.
The operator checks once if there are messages (tuples) available on the RabbitMQ server. If yes,
all the messages are gathered and sent downstreams to the subscribers (that means, the following
operator(s)). Then it finishes. However, it can be easily adapted to stay waiting, repeatedly
asking the server if new messages have arrived.
......@@ -30,9 +30,14 @@ option(BUILD_USE_CASES
OFF
)
# Use additional network sources, like RabbitMQ
option(NETWORK_SOURCES
"use additional network sources, like RabbitMQ"
# Use RabbitMQ as network source
option(USE_RABBITMQ
"use RabbitMQ as network source"
OFF
)
# Use Apache Kafka as network source
option(USE_KAFKA
"use Apache Kafka as network source"
OFF
)
......@@ -311,8 +316,15 @@ set(core_libs
${ZEROMQ_LIBRARIES}
)
if(NETWORK_SOURCES)
add_definitions(-DNETWORK_SOURCES)
#-----------------------------------------------------------------------------------------
#
##########
# RabbitMQ
##########
#
#
if(USE_RABBITMQ)
add_definitions(-DUSE_RABBITMQ)
set(core_libs
${core_libs}
amqpcpp
......@@ -323,6 +335,26 @@ if(NETWORK_SOURCES)
net/RabbitMQSource.cpp
)
endif()
#-----------------------------------------------------------------------------------------
#
##############
# Apache Kafka
##############
#
#
if(USE_KAFKA)
add_definitions(-DUSE_KAFKA)
set(core_libs
${core_libs}
cppkafka
rdkafka
)
set(core_sources
${core_sources}
net/KafkaSource.cpp
)
endif()
#-----------------------------------------------------------------------------------------
add_library(pfabric_core SHARED
${core_sources}
......
......@@ -89,10 +89,10 @@ Pipe<TStringPtr> Topology::newStreamFromFile(const std::string& fname, unsigned
return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
}
#ifdef NETWORK_SOURCES
Pipe<TStringPtr> Topology::newStreamFromRabbitMQ(const std::string& info) {
#ifdef USE_RABBITMQ
Pipe<TStringPtr> Topology::newStreamFromRabbitMQ(const std::string& info, const std::string& queueName) {
// create a new RabbitMQSource
auto op = std::make_shared<RabbitMQSource>(info);
auto op = std::make_shared<RabbitMQSource>(info, queueName);
// register it's start function
registerStartupFunction(std::bind(&RabbitMQSource::start, op.get()));
// and create a new pipe; we use a raw pointer here because
......@@ -101,6 +101,19 @@ Pipe<TStringPtr> Topology::newStreamFromRabbitMQ(const std::string& info) {
}
#endif
#ifdef USE_KAFKA
Pipe<TStringPtr> Topology::newStreamFromKafka(const std::string& broker, const std::string& topic,
const std::string& groupID) {
// create a new KafkaSource
auto op = std::make_shared<KafkaSource>(broker, topic, groupID);
// register it's start function
registerStartupFunction(std::bind(&KafkaSource::start, op.get()));
// and create a new pipe; we use a raw pointer here because
// we want to return a reference to a Pipe object
return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
}
#endif
Pipe<TStringPtr> Topology::newStreamFromREST(unsigned int port,
const std::string& path,
RESTSource::RESTMethod method,
......
......@@ -41,12 +41,15 @@
#include "qop/SelectFromTable.hpp"
#include "qop/StreamGenerator.hpp"
#ifdef SUPPORT_MATRICES
#include "qop/FromMatrix.hpp"
#include "qop/FromMatrix.hpp"
#endif
#include "dsl/Pipe.hpp"
#include "dsl/Dataflow.hpp"
#ifdef NETWORK_SOURCES
#include "net/RabbitMQSource.hpp"
#ifdef USE_RABBITMQ
#include "net/RabbitMQSource.hpp"
#endif
#ifdef USE_KAFKA
#include "net/KafkaSource.hpp"
#endif
namespace pfabric {
......@@ -187,21 +190,43 @@ namespace pfabric {
RESTSource::RESTMethod method,
unsigned short numThreads = 1);
#ifdef NETWORK_SOURCES
#ifdef USE_RABBITMQ
/**
* @brief Creates a pipe from a RabbitMQ source as input.
*
* Creates a new pipe for receiving tuples via AMQP server (RabbitMQ).
* It reads messages from the AMQP queue and forwards them as tuples
* to the subscribers, as long as there are messages on the server
* to the subscribers, as long as there are messages on the server.
*
* @param[in] info
* a string containing password, user, address and port of the server
* format: "password:user@address:port", e.g. "guest:guest@localhost:5672"
* @param[in] queueName
* a string containing the name of the queue for exchanging tuples, e.g. "queue"
* @return
* a new pipe where RabbitMQSource acts as a producer.
*/
Pipe<TStringPtr> newStreamFromRabbitMQ(const std::string& info);
Pipe<TStringPtr> newStreamFromRabbitMQ(const std::string& info, const std::string& queueName);
#endif
#ifdef USE_KAFKA
/**
* @brief Creates a pipe from an Apache Kafka source as input.
*
* Creates a new pipe for receiving tuples via Apache Kafka protocol.
*
* @param[in] broker
* the node(s) where the Kafka server runs on,
* e.g. "127.0.0.1:9092" for localhost
* @param[in] topic
* the topic where the data is stored (Kafka property)
* @param[in] groupID
* the ID of the group the consumer belongs to
* @return
* a new pipe where KafkaSource acts as a producer.
*/
Pipe<TStringPtr> newStreamFromKafka(const std::string& broker, const std::string& topic,
const std::string& groupID);
#endif
/**
......
/*
* Copyright (c) 2014-17 The PipeFabric team,
* All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License (GPL) as
* published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; see the file LICENSE.
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#include "cppkafka/consumer.h"
#include "cppkafka/configuration.h"
#include "KafkaSource.hpp"
using namespace pfabric;
KafkaSource::KafkaSource(const std::string& broker, const std::string& topic,
const std::string& groupID) {
//create config file, just necessary to start the consumer
cppkafka::Configuration config = {
{ "metadata.broker.list", broker },
{ "group.id", groupID },
{ "enable.auto.commit", false }
};
consumer = new cppkafka::Consumer(config);
consumer->subscribe({ topic });
//(!) without polling once, we will not get the current position of data in the topic
msg = consumer->poll();
}
KafkaSource::~KafkaSource() {
}
unsigned long KafkaSource::start() {
//refresh (poll)
msg = consumer->poll();
//as long as there are messages - also possible to use "while(true)" to stay connected
//getting messages sent later on, but for the test case we have to finish some time
while(msg) {
//no error handling currently, but there is also the error "eof" from Kafka that is
//not necessary to handle here
if (!msg.get_error()) {
produceTuple(StringRef((char*)msg.get_payload().get_data(),
msg.get_payload().get_size()));
consumer->commit(msg);
}
//check if there are more messages waiting
msg = consumer->poll();
}
delete consumer;
return 0;
}
void KafkaSource::stop() {
}
void KafkaSource::produceTuple(const StringRef& data) {
auto tn = makeTuplePtr(data);
this->getOutputDataChannel().publish(tn, false);
}
void KafkaSource::producePunctuation(PunctuationPtr pp) {
this->getOutputPunctuationChannel().publish(pp);
}
/*
* Copyright (c) 2014-17 The PipeFabric team,
* All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License (GPL) as
* published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; see the file LICENSE.
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#ifndef KafkaSource_hpp_
#define KafkaSource_hpp_
#include "core/Punctuation.hpp"
#include "core/Tuple.hpp"
#include "qop/DataSource.hpp"
#include "pubsub/channels/ConnectChannels.hpp"
#include "qop/BaseOp.hpp"
#include "qop/OperatorMacros.hpp"
#include "qop/TextFileSource.hpp"
#include "cppkafka/consumer.h"
#include "cppkafka/configuration.h"
namespace pfabric {
/**
* @brief KafkaSource is a source operator for receiving tuples via Apache Kafka protocol.
*
* A KafkaSource is an operator producing a stream of tuples which are received via Apache
* Kafka protocol. The operator produces a stream of @c TStringPtr elements.
*/
class KafkaSource : public DataSource<TStringPtr> {
public:
PFABRIC_SOURCE_TYPEDEFS(TStringPtr);
/**
* @brief Create a new instance of the KafkaSource operator.
*
* Create a new KafkaSource operator for receiving stream tuples via Apache Kafka
* protocol.
*
* @param[in] broker
* the node(s) where the Kafka server runs on
* @param[in] topic
* the topic where the data is stored
* @param[in] groupID
* the ID of the group the consumer belongs to
*/
KafkaSource(const std::string& broker, const std::string& topic, const std::string& groupID);
/**
* Deallocates all resources.
*/
~KafkaSource();
/**
* Start the operator by polling the Kafka server.
*
* @return always 0
*/
unsigned long start();
/**
* Stop the processing.
*/
void stop();
protected:
cppkafka::Consumer *consumer;
cppkafka::Message msg;
void produceTuple(const StringRef& data);
void producePunctuation(PunctuationPtr pp);
};
}
#endif
......@@ -24,8 +24,9 @@
using namespace pfabric;
RabbitMQSource::RabbitMQSource(const std::string& info) {
RabbitMQSource::RabbitMQSource(const std::string& info, const std::string& queueName) {
mInfo = info;
mQueueName = queueName;
}
RabbitMQSource::~RabbitMQSource() {
......@@ -33,15 +34,18 @@ RabbitMQSource::~RabbitMQSource() {
unsigned long RabbitMQSource::start() {
AMQP amqp(mInfo);
AMQPQueue* que = amqp.createQueue("q");
AMQPQueue* que = amqp.createQueue(mQueueName);
que->Declare();
uint32_t len = 0;
char* data;
//get first tuple
que->Get(AMQP_NOACK);
AMQPMessage* m = que->getMessage();
char* data = m->getMessage(&len);
produceTuple(StringRef(data, len));
if(m->getMessageCount() > 0) {
data = m->getMessage(&len);
produceTuple(StringRef(data, len));
}
//and all the others
while(m->getMessageCount() > 0) {
......
......@@ -52,10 +52,10 @@ namespace pfabric {
* @param[in] info
* a string containing password, user, address and port of the server
* format: "password:user@address:port", e.g. "guest:guest@localhost:5672"
* @param[in] name
* the name of the channel to listen on, e.g. "tupleProducer"
* @param[in] queueName
* a string containing the name of the queue for exchanging tuples, e.g. "queue"
*/
RabbitMQSource(const std::string& info);
RabbitMQSource(const std::string& info, const std::string& queueName);
/**
* Deallocates all resources.
......@@ -76,6 +76,7 @@ namespace pfabric {
protected:
std::string mInfo;
std::string mQueueName;
void produceTuple(const StringRef& data);
void producePunctuation(PunctuationPtr pp);
......
......@@ -57,7 +57,11 @@ if (BUILD_TEST_CASES)
do_test(MatrixTest)
endif()
if(NETWORK_SOURCES)
if(USE_RABBITMQ)
do_test(RabbitMQSourceTest)
endif()
if(USE_KAFKA)
do_test(KafkaSourceTest)
endif()
endif()
/*
* Copyright (c) 2014-17 The PipeFabric team,
* All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License (GPL) as
* published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; see the file LICENSE.
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
#include "catch.hpp"
#include <cppkafka/producer.h>
#include "cppkafka/configuration.h"
#include <string>
#include <iostream>
#include "dsl/Topology.hpp"
#include "dsl/Pipe.hpp"
#include "dsl/PFabricContext.hpp"
using namespace pfabric;
TEST_CASE("Producing and receiving tuples via Apache Kafka protocol", "[Kafka]") {
//prepare the consuming - necessary to subscribe to the topic before we start
//producing tuples (to set the "pointer" on the current topic data correctly)
typedef TuplePtr<int, double> InTuplePtr;
int resCntr = 0;
std::string grp = "TestGroup"+std::to_string(rand()%100000);
PFabricContext ctx;
auto t = ctx.createTopology();
auto s = t->newStreamFromKafka("127.0.0.1:9092", "PipeFabric", grp)
.extract<InTuplePtr>(',')
.print()
.notify([&resCntr](auto tp, bool outdated) { resCntr++; })
;
//start the producing
std::cout<<"Producing 100 tuples..."<<std::endl;
cppkafka::Configuration config = {
{ "metadata.broker.list", "127.0.0.1:9092" }
};
cppkafka::Producer producer(config);
cppkafka::MessageBuilder builder("PipeFabric");
std::string msg;
for(auto i=0; i<100; i++) { //100 tuples
msg = "";
msg.append(std::to_string(i));
msg.append(",1.5");
builder.payload(msg);
producer.produce(builder);
}
//start the consuming
t->start(false);
REQUIRE(resCntr == 100);
}
\ No newline at end of file
......@@ -67,7 +67,7 @@ TEST_CASE("Producing and receiving tuples via AMQP and RabbitMQ", "[RabbitMQ]")
PFabricContext ctx;
auto t = ctx.createTopology();
auto s = t->newStreamFromRabbitMQ("guest:guest@localhost:5672")
auto s = t->newStreamFromRabbitMQ("guest:guest@localhost:5672", "queue")
.extract<InTuplePtr>(',')
.notify([&resCntr](auto tp, bool outdated) { resCntr++; })
;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment