Commit ff7ac181 authored by Constantin Pohl's avatar Constantin Pohl
Browse files

Added optional RabbitMQ support

parent 5bd22964
......@@ -22,3 +22,4 @@ operators and utility classes. It consists of the following main components:
+ [Tutorial: Stream partitioning](/documentation/Partitioning.md)
+ [Tutorial: How to build and use a Docker image](/documentation/Docker.md)
+ [PipeFabric Use Cases](/documentation/UseCases.md)
+ [Additional network sources](/documentation/Network.md)
## Additional network sources ##
There already exist ZeroMQ and REST as network sources, providing tuples via network connection.
In addition, the AMQP (Advanced Message Queuing Protocol) used by RabbitMQ can be used as source.
However, there are two additional libraries/installs necessary to run the protocol which are not
delivered in PipeFabric per default:
+ [RabbitMQ Server](https://www.rabbitmq.com/download.html)
+ [Amqpcpp library](https://github.com/akalend/amqpcpp), available on GitHub
The server is necessary for realization of AMQP, while the Amqpcpp library allows using the server
within the C++ language. On Linux side, the server is usually inside of the standard repositories,
so you can easily install it with the command `sudo apt-get install rabbitmq-server`.
### Installation ###
After the RabbitMQ server and the Amqpcpp library are installed and added to your path (in a way
that CMake can find them), you have to enable the network source by switching the CMake variable
`NETWORK_SOURCES` to `ON`. This can be done manually in the CMakeLists.txt file in the src folder
or by passing `-DNETWORK_SOURCES=ON` to cmake, like `cmake -DNETWORK_SOURCES=ON ../src`.
In addition, you have to start the RabbitMQ server before running the test case. This can be done
on console by the command `$service rabbitmq-server start` (without the $). Else the test case
will throw an error.
#### newStreamFromRabbitMQ ####
`Pipe<TStringPtr> Topology::newStreamFromRabbitMQ(const std::string& info)`
This is an operator for receiving tuples via RabbitMQ. Each incoming message of the RabbitMQ
service produces a single tuple (consisting of a single string). The parameter `info` declares the
connection of the server. Usually when the RabbitMQ server is started without modifications, a
user named "guest" with the password "guest" is applied to the system. The `info` parameter can
then be entered as `guest:guest@localhost:5672`, where 5672 is the used port.
The operator checks once if there are messages (tuples) available on the RabbitMQ server. If yes,
all the messages are gathered and sent downstreams to the subscribers (that means, the following
operator(s)). Then it finishes. However, it can be easily adapted to stay waiting, repeatedly
asking the server if new messages have arrived.
......@@ -19,14 +19,22 @@ set(PIPEFABRIC_DIR "/usr/local/pfabric")
# which can be switched off to reduce build time.
# Support Matrix Operations (needs Eigen library to be installed)
option(SUPPORT_MATRICES
"support matrix operations as tuple and state type"
OFF)
option(SUPPORT_MATRICES
"support matrix operations as tuple and state type"
OFF
)
# Build use cases
option(BUILD_USE_CASES
""
OFF)
"build use cases to show functionality examples"
OFF
)
# Use additional network sources, like RabbitMQ
option(NETWORK_SOURCES
"use additional network sources, like RabbitMQ"
OFF
)
# Use the boost::spirit parser for converting strings to numbers
option(USE_BOOST_SPIRIT_PARSER
......@@ -53,13 +61,13 @@ option(BUILD_ONLY_LIBS
# If switched to off, no tests will be build
option(BUILD_TEST_CASES
"build tests for pipefabric functionality"
ON
ON
)
#Build google benchmark library
option(BUILD_GOOGLE_BENCH
"build google benchmark"
ON
ON
)
# Build benchmark test
......@@ -298,13 +306,30 @@ if(BUILD_GOOGLE_BENCH)
)
endif()
set(core_libs
${BOOST_LIBRARIES}
${ZEROMQ_LIBRARIES}
)
if(NETWORK_SOURCES)
add_definitions(-DNETWORK_SOURCES)
set(core_libs
${core_libs}
amqpcpp
rabbitmq
)
set(core_sources
${core_sources}
net/RabbitMQSource.cpp
)
endif()
add_library(pfabric_core SHARED
${core_sources}
)
target_link_libraries(pfabric_core
${BOOST_LIBRARIES}
${ZEROMQ_LIBRARIES}
${core_libs}
)
#-----------------------------------------------------------------------------------------
......
......@@ -89,11 +89,23 @@ Pipe<TStringPtr> Topology::newStreamFromFile(const std::string& fname, unsigned
return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
}
#ifdef NETWORK_SOURCES
Pipe<TStringPtr> Topology::newStreamFromRabbitMQ(const std::string& info) {
// create a new RabbitMQSource
auto op = std::make_shared<RabbitMQSource>(info);
// register it's start function
registerStartupFunction(std::bind(&RabbitMQSource::start, op.get()));
// and create a new pipe; we use a raw pointer here because
// we want to return a reference to a Pipe object
return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
}
#endif
Pipe<TStringPtr> Topology::newStreamFromREST(unsigned int port,
const std::string& path,
RESTSource::RESTMethod method,
unsigned short numThreads) {
// create a new TextFileSource
// create a new RESTSource
auto op = std::make_shared<RESTSource>(port, path, method, numThreads);
// register it's start function
registerStartupFunction(std::bind(&RESTSource::start, op.get()));
......
......@@ -45,6 +45,9 @@
#endif
#include "dsl/Pipe.hpp"
#include "dsl/Dataflow.hpp"
#ifdef NETWORK_SOURCES
#include "net/RabbitMQSource.hpp"
#endif
namespace pfabric {
......@@ -184,6 +187,23 @@ namespace pfabric {
RESTSource::RESTMethod method,
unsigned short numThreads = 1);
#ifdef NETWORK_SOURCES
/**
* @brief Creates a pipe from a RabbitMQ source as input.
*
* Creates a new pipe for receiving tuples via AMQP server (RabbitMQ).
* It reads messages from the AMQP queue and forwards them as tuples
* to the subscribers, as long as there are messages on the server
*
* @param[in] info
* a string containing password, user, address and port of the server
* format: "password:user@address:port", e.g. "guest:guest@localhost:5672"
* @return
* a new pipe where RabbitMQSource acts as a producer.
*/
Pipe<TStringPtr> newStreamFromRabbitMQ(const std::string& info);
#endif
/**
* @brief Creates a pipe from a ZMQ source as input.
*
......
/*
* Copyright (c) 2014-17 The PipeFabric team,
* All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License (GPL) as
* published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; see the file LICENSE.
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#include "AMQPcpp.h"
#include "RabbitMQSource.hpp"
using namespace pfabric;
RabbitMQSource::RabbitMQSource(const std::string& info) {
mInfo = info;
}
RabbitMQSource::~RabbitMQSource() {
}
unsigned long RabbitMQSource::start() {
AMQP amqp(mInfo);
AMQPQueue* que = amqp.createQueue("q");
que->Declare();
uint32_t len = 0;
//get first tuple
que->Get(AMQP_NOACK);
AMQPMessage* m = que->getMessage();
char* data = m->getMessage(&len);
produceTuple(StringRef(data, len));
//and all the others
while(m->getMessageCount() > 0) {
que->Get(AMQP_NOACK);
m = que->getMessage();
data = m->getMessage(&len);
produceTuple(StringRef(data, len));
}
return 0;
}
void RabbitMQSource::stop() {
}
void RabbitMQSource::produceTuple(const StringRef& data) {
auto tn = makeTuplePtr(data);
this->getOutputDataChannel().publish(tn, false);
}
void RabbitMQSource::producePunctuation(PunctuationPtr pp) {
this->getOutputPunctuationChannel().publish(pp);
}
/*
* Copyright (c) 2014-17 The PipeFabric team,
* All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License (GPL) as
* published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; see the file LICENSE.
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#ifndef RabbitMQSource_hpp_
#define RabbitMQSource_hpp_
#include "core/Punctuation.hpp"
#include "core/Tuple.hpp"
#include "qop/DataSource.hpp"
#include "pubsub/channels/ConnectChannels.hpp"
#include "qop/BaseOp.hpp"
#include "qop/OperatorMacros.hpp"
#include "qop/TextFileSource.hpp"
#include "AMQPcpp.h"
namespace pfabric {
/**
* @brief RabbitMQSource is a source operator for receiving tuples via AMQP interface.
*
* A RabbitMQSource is an operator producing a stream of tuples which are received
* via AMQP interface. The operator produces a stream of @c TStringPtr elements,
* where each element corresponds to a single AMQP message.
*/
class RabbitMQSource : public DataSource<TStringPtr> {
public:
PFABRIC_SOURCE_TYPEDEFS(TStringPtr);
/**
* @brief Create a new instance of the RabbitMQSource operator.
*
* Create a new RabbitMQSource operator for receiving stream tuples via AMQP.
*
* @param[in] info
* a string containing password, user, address and port of the server
* format: "password:user@address:port", e.g. "guest:guest@localhost:5672"
* @param[in] name
* the name of the channel to listen on, e.g. "tupleProducer"
*/
RabbitMQSource(const std::string& info);
/**
* Deallocates all resources.
*/
~RabbitMQSource();
/**
* Start the operator by listing at the given port and address.
*
* @return always 0
*/
unsigned long start();
/**
* Stop the processing.
*/
void stop();
protected:
std::string mInfo;
void produceTuple(const StringRef& data);
void producePunctuation(PunctuationPtr pp);
};
}
#endif
......@@ -21,13 +21,15 @@
#ifndef SHJoin_hpp_
#define SHJoin_hpp_
#include <boost/unordered/unordered_map.hpp>
#include <boost/core/ignore_unused.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/lock_guard.hpp>
#include <unordered_map>
#include "qop/BinaryTransform.hpp"
#include "ElementJoinTraits.hpp"
#include "DefaultElementJoin.hpp"
namespace pfabric {
/**
......@@ -73,12 +75,17 @@ namespace pfabric {
* The type definition for our hash tables: we use the native Boost implementation.
* Because, we allow that stream elements have the same key, we need a multimap here.
*/
typedef boost::unordered_multimap< KeyType, LeftInputStreamElement > LHashTable;
typedef boost::unordered_multimap< KeyType, RightInputStreamElement > RHashTable;
typedef std::unordered_multimap< KeyType, LeftInputStreamElement > LHashTable;
typedef std::unordered_multimap< KeyType, RightInputStreamElement > RHashTable;
/// the join algorithm to be used for concatenating the input elements
typedef ElementJoinTraits< ElementJoinImpl > ElementJoin;
/// a mutex for protecting join processing from concurrent sources
typedef boost::mutex JoinMutex;
/// a scoped lock for the mutex
typedef boost::lock_guard< JoinMutex > Lock;
public:
......@@ -130,10 +137,11 @@ namespace pfabric {
* flag indicating whether the tuple is new or invalidated now
*/
void processLeftDataElement( const LeftInputStreamElement& left, const bool outdated ) {
Lock lock( mMtx );
// 1. insert the tuple in the corresponding hash table or remove it if outdated
auto keyval = mLKeyExtractor( left );
updateHashTable( mLTable, keyval, left, outdated);
updateHashTable( mLTable, keyval, left, outdated, lock );
// 2. find join partners in the other hash table
auto rightEqualElements = mRTable.equal_range(keyval);
......@@ -155,10 +163,11 @@ namespace pfabric {
* flag indicating whether the tuple is new or invalidated now
*/
void processRightDataElement( const RightInputStreamElement& right, const bool outdated ) {
Lock lock( mMtx );
// 1. insert the tuple in the corresponding hash table or remove it if outdated
auto keyval = mRKeyExtractor( right );
updateHashTable( mRTable, keyval, right, outdated);
updateHashTable( mRTable, keyval, right, outdated, lock );
// 2. find join partners in the other hash table
auto leftEqualElements = mLTable.equal_range( keyval );
......@@ -204,7 +213,8 @@ namespace pfabric {
typename StreamElement
>
static void updateHashTable( HashTable& hashTable, const key_t& key,
const StreamElement& newElement, const bool outdated) {
const StreamElement& newElement, const bool outdated, const Lock& lock ) {
boost::ignore_unused( lock );
if( !outdated ) {
hashTable.insert( { key, newElement });
......@@ -250,12 +260,12 @@ namespace pfabric {
}
}
LHashTable mLTable; //< hash table for the lhs stream
RHashTable mRTable; //< hash table for the rhs stream
JoinPredicateFunc mJoinPredicate; //< a pointer to the function implementing the join predicate
LKeyExtractorFunc mLKeyExtractor; //< hash function for the lhs stream
RKeyExtractorFunc mRKeyExtractor; //< hash function for the rhs stream
mutable JoinMutex mMtx;
};
} /* end namespace pfabric */
......
......@@ -5,54 +5,59 @@ include(../../cmake/Testing.cmake.in)
add_definitions( -DTEST_DATA_DIRECTORY="${CMAKE_CURRENT_SOURCE_DIR}/test_data/")
if (BUILD_TEST_CASES)
do_test(FlowTest)
do_test(TupleTest)
do_test(TimestampHelperTest)
do_test(StreamElementTraitsTest)
do_test(SourceTest)
do_test(SinkTest)
do_test(SignalTest)
do_test(SelectChannelParametersTest)
do_test(ChannelGroupTest)
do_test(TextFileSourceTest)
do_test(FlowTest)
do_test(TupleTest)
do_test(TimestampHelperTest)
do_test(StreamElementTraitsTest)
do_test(SourceTest)
do_test(SinkTest)
do_test(SignalTest)
do_test(SelectChannelParametersTest)
do_test(ChannelGroupTest)
do_test(TextFileSourceTest)
do_test(MemorySourceTest)
do_test(RESTSourceTest)
do_test(MapTest)
do_test(WhereTest)
do_test(NotifyTest)
do_test(QueueTest)
do_test(TupleExtractorTest)
do_test(WriterTest)
do_test(WindowTest)
do_test(SHJoinTest)
do_test(TopologyTest)
do_test(RESTSourceTest)
do_test(MapTest)
do_test(WhereTest)
do_test(NotifyTest)
do_test(QueueTest)
do_test(TupleExtractorTest)
do_test(WriterTest)
do_test(WindowTest)
do_test(SHJoinTest)
do_test(TopologyTest)
do_test(TopologyJoinTest)
do_test(TopologyAggregationTest)
do_test(TopologyGroupByTest)
do_test(AggregateFuncTest)
do_test(AggregationTest)
do_test(GroupedAggregationTest)
do_test(ZMQSourceTest)
do_test(SeqCEPTest)
do_test(HashMapTableTest)
do_test(ToTableTest)
do_test(FromTableTest)
do_test(ContextTest)
do_test(CEPDSLTest)
do_test(ZMQPubSubTest)
do_test(PartitionTest)
do_test(BarrierTest)
do_test(StreamGeneratorTest)
do_test(TuplifierTest)
# do_test(BPTreeTest)
do_test(AggregateFuncTest)
do_test(AggregationTest)
do_test(GroupedAggregationTest)
do_test(ZMQSourceTest)
do_test(SeqCEPTest)
do_test(HashMapTableTest)
do_test(ToTableTest)
do_test(FromTableTest)
do_test(ContextTest)
do_test(CEPDSLTest)
do_test(ZMQPubSubTest)
do_test(PartitionTest)
do_test(BarrierTest)
do_test(StreamGeneratorTest)
do_test(TuplifierTest)
#do_test(BPTreeTest)
if (USE_ROCKSDB_TABLE)
do_test(RocksDBTest)
do_test(RDBTableTest)
endif()
if(SUPPORT_MATRICES)
do_test(FromMatrixTest)
do_test(MatrixSliceTest)
do_test(MatrixTest)
endif()
if (USE_ROCKSDB_TABLE)
do_test(RocksDBTest)
do_test(RDBTableTest)
endif()
if(SUPPORT_MATRICES)
do_test(FromMatrixTest)
do_test(MatrixSliceTest)
do_test(MatrixTest)
endif()
if(NETWORK_SOURCES)
do_test(RabbitMQSourceTest)
endif()
endif()
/*
* Copyright (c) 2014-17 The PipeFabric team,
* All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License (GPL) as
* published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; see the file LICENSE.
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
#include "catch.hpp"
#include "AMQPcpp.h"
#include <string>
#include <iostream>
#include "core/Tuple.hpp"
#include "dsl/Topology.hpp"
#include "dsl/Pipe.hpp"
#include "dsl/PFabricContext.hpp"
using namespace pfabric;
TEST_CASE("Producing and receiving tuples via AMQP and RabbitMQ", "[RabbitMQ]") {
std::cout<<"Producing 100 tuples..."<<std::endl;
//produce
AMQP amqp("guest:guest@localhost:5672"); //standard
AMQPExchange* ex = amqp.createExchange("tupleProducer");
ex->Declare("tupleProducer", "fanout");
AMQPQueue *qu = amqp.createQueue("queue");
qu->Declare();
qu->Bind("tupleProducer","");
std::string msg;
for(auto i=0; i<100; i++) { //100 tuples
msg = "";
msg.append(std::to_string(i));
msg.append(",1.5");
ex->Publish(msg,"");
}
std::cout<<"Receiving..."<<std::endl;
//consume
typedef TuplePtr<int, double> InTuplePtr;
int resCntr = 0;
PFabricContext ctx;
auto t = ctx.createTopology();
auto s = t->newStreamFromRabbitMQ("guest:guest@localhost:5672")
.extract<InTuplePtr>(',')
.notify([&resCntr](auto tp, bool outdated) { resCntr++; })
;
t->start(false);
REQUIRE(resCntr == 100);
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment