Commit 73e8488e authored by Constantin Pohl's avatar Constantin Pohl
Browse files

Added optional MQTT support

parent c5ff965c
......@@ -2,8 +2,9 @@
There already exist ZeroMQ and REST as network sources, providing tuples via network connection.
In addition, the AMQP (Advanced Message Queuing Protocol) used by RabbitMQ as well as the Apache
Kafka protocol can be used as source. However, there are additional libraries/installs necessary to
run the protocols which are not delivered in PipeFabric per default.
Kafka and the MQTT (Message Queue Telemetry Transport) protocol can be used as source. However,
there are additional libraries/installs necessary to run the protocols which are not delivered in
PipeFabric per default.
## RabbitMQ ##
......@@ -109,3 +110,46 @@ waiting, repeatedly asking the server if new messages have arrived.
There is an example (test case) provided how to use it properly which can be found in
`/test/KafkaSourceTest.cpp` of the source folder.
## MQTT ##
### Preliminaries and Installation ###
For MQTT:
+ [Eclipse Mosquitto](https://mosquitto.org/)
+ [Eclipse Paho C](https://github.com/eclipse/paho.mqtt.c.git), available on Github
+ [Eclipse Paho C++](https://github.com/eclipse/paho.mqtt.cpp.git), available on GitHub
The Eclipse Mosquitto delivers the necessities for running an MQTT server. In Linux systems, it is
possible to use the command `sudo apt-get install mosquitto mosquitto-clients` for the server and
the client software. Eclipse Paho provides the libraries and headers for C++ support. Because of
Eclipse Paho C++ references the Eclipse Paho C installation, both are necessary. Both have to be
installed in a way that CMake can find them (libraries and headers).
MQTT is then enabled in PipeFabric by switching the CMake variable `USE_MQTT` to `ON`. This can be
done manually in the CMakeLists.txt file in the src folder or by passing `-DUSE_MQTT=ON` to cmake,
like `cmake -DUSE_MQTT=ON ../src`.
In addition, you have to start the MQTT server before running the test case. This can be done on
console by the command `mosquitto`. Else the test case will throw an error, namely
`MQTT error [-1]: TCP/TLS connect failure`.
### Usage ###
PipeFabric provides an interface to the MQTT server in which all currently available messages on
the server are gathered, transformed to tuples and forwarded to the query. This is done by using
the operator `newStreamFromMQTT`:
`Pipe<TStringPtr> Topology::newStreamFromMQTT(const std::string& conn, const std::string& channel)`
Each incoming message of the MQTT server produces a single tuple (consisting of a single string).
The parameter `conn` describes the server address with port. The channel is just the name of the
topic where the messages are exchanged.
The operator currently checks once if there are messages (tuples) available on the MQTT server.
If yes, all the messages are consecutively gathered and sent downstreams to the subscribers (that
means, the following operator(s)). Then it finishes. However, it can be easily adapted to stay
waiting, repeatedly asking the server if new messages have arrived.
There is an example (test case) provided how to use it properly which can be found in
`/test/MQTTSourceTest.cpp` of the source folder.
......@@ -33,12 +33,17 @@ option(BUILD_USE_CASES
# Use RabbitMQ as network source
option(USE_RABBITMQ
"use RabbitMQ as network source"
OFF
ON
)
# Use Apache Kafka as network source
option(USE_KAFKA
"use Apache Kafka as network source"
OFF
ON
)
# Use MQTT as network source
option(USE_MQTT
"use MQTT as network source"
ON
)
# Use the boost::spirit parser for converting strings to numbers
......@@ -355,6 +360,28 @@ if(USE_KAFKA)
)
endif()
#-----------------------------------------------------------------------------------------
#
######
# MQTT
######
#
#
if(USE_MQTT)
add_definitions(-DUSE_MQTT)
set(core_libs
${core_libs}
paho-mqtt3c
paho-mqtt3cs
paho-mqtt3a
paho-mqtt3as
paho-mqttpp3
)
set(core_sources
${core_sources}
net/MQTTSource.cpp
)
endif()
#-----------------------------------------------------------------------------------------
add_library(pfabric_core SHARED
${core_sources}
......
......@@ -114,6 +114,18 @@ Pipe<TStringPtr> Topology::newStreamFromKafka(const std::string& broker, const s
}
#endif
#ifdef USE_MQTT
Pipe<TStringPtr> Topology::newStreamFromMQTT(const std::string& conn, const std::string& channel) {
// create a new MQTTSource
auto op = std::make_shared<MQTTSource>(conn, channel);
// register it's start function
registerStartupFunction(std::bind(&MQTTSource::start, op.get()));
// and create a new pipe; we use a raw pointer here because
// we want to return a reference to a Pipe object
return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
}
#endif
Pipe<TStringPtr> Topology::newStreamFromREST(unsigned int port,
const std::string& path,
RESTSource::RESTMethod method,
......
......@@ -51,6 +51,9 @@
#ifdef USE_KAFKA
#include "net/KafkaSource.hpp"
#endif
#ifdef USE_MQTT
#include "net/MQTTSource.hpp"
#endif
namespace pfabric {
......@@ -229,6 +232,22 @@ namespace pfabric {
const std::string& groupID);
#endif
#ifdef USE_MQTT
/**
* @brief Creates a pipe from a MQTT source as input.
*
* Creates a new pipe for receiving tuples via MQTT.
*
* @param[in] conn
* server connection info, e.g. "tcp://localhost:1883"
* @param[in] channel
* the name of the channel to listen on
* @return
* a new pipe where MQTTSource acts as a producer.
*/
Pipe<TStringPtr> newStreamFromMQTT(const std::string& conn, const std::string& channel);
#endif
/**
* @brief Creates a pipe from a ZMQ source as input.
*
......
/*
* Copyright (c) 2014-17 The PipeFabric team,
* All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License (GPL) as
* published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; see the file LICENSE.
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#include "mqtt/async_client.h"
#include "MQTTSource.hpp"
#include <string>
using namespace pfabric;
MQTTSource::MQTTSource(const std::string& conn, const std::string& channel) {
chan = channel;
mqtt::connect_options connOpts;
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);
const int id = rand()%100000;
cli = new mqtt::async_client(conn, std::to_string(id));
cli->connect(connOpts)->wait();
cli->start_consuming();
cli->subscribe(channel, 1)->wait();
}
MQTTSource::~MQTTSource() {
}
unsigned long MQTTSource::start() {
mqtt::const_message_ptr msg;
//as long as there are messages to gather from server
while(cli->try_consume_message(&msg)) {
produceTuple(StringRef(msg->to_string().c_str(),
msg->to_string().size()));
}
cli->unsubscribe(chan)->wait();
cli->stop_consuming();
cli->disconnect()->wait();
delete cli;
return 0;
}
void MQTTSource::stop() {
}
void MQTTSource::produceTuple(const StringRef& data) {
auto tn = makeTuplePtr(data);
this->getOutputDataChannel().publish(tn, false);
}
void MQTTSource::producePunctuation(PunctuationPtr pp) {
this->getOutputPunctuationChannel().publish(pp);
}
/*
* Copyright (c) 2014-17 The PipeFabric team,
* All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License (GPL) as
* published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; see the file LICENSE.
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#ifndef MQTTSource_hpp_
#define MQTTSource_hpp_
#include "core/Punctuation.hpp"
#include "core/Tuple.hpp"
#include "qop/DataSource.hpp"
#include "pubsub/channels/ConnectChannels.hpp"
#include "qop/BaseOp.hpp"
#include "qop/OperatorMacros.hpp"
#include "qop/TextFileSource.hpp"
#include "mqtt/async_client.h"
namespace pfabric {
/**
* @brief MQTTSource is a source operator for receiving tuples via MQTT.
*
* A MQTTSource is an operator producing a stream of tuples which are received via MQTT.
* The operator produces a stream of @c TStringPtr elements.
*/
class MQTTSource : public DataSource<TStringPtr> {
public:
PFABRIC_SOURCE_TYPEDEFS(TStringPtr);
/**
* @brief Create a new instance of the MQTTSource operator.
*
* Create a new MQTTSource operator for receiving stream tuples via MQTT.
*
* @param[in] conn
* server connection info, e.g. "tcp://localhost:1883"
* @param[in] channel
* the name of the channel to listen on
*/
MQTTSource(const std::string& conn, const std::string& channel);
/**
* Deallocates all resources.
*/
~MQTTSource();
/**
* Start the operator by polling the MQTT server.
*
* @return always 0
*/
unsigned long start();
/**
* Stop the processing.
*/
void stop();
protected:
mqtt::async_client *cli;
std::string chan;
void produceTuple(const StringRef& data);
void producePunctuation(PunctuationPtr pp);
};
}
#endif
......@@ -64,4 +64,8 @@ if (BUILD_TEST_CASES)
if(USE_KAFKA)
do_test(KafkaSourceTest)
endif()
if(USE_MQTT)
do_test(MQTTSourceTest)
endif()
endif()
/*
* Copyright (c) 2014-17 The PipeFabric team,
* All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License (GPL) as
* published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; see the file LICENSE.
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
#include "catch.hpp"
#include <string>
#include <iostream>
#include "dsl/Topology.hpp"
#include "dsl/Pipe.hpp"
#include "dsl/PFabricContext.hpp"
#include "mqtt/async_client.h"
using namespace pfabric;
TEST_CASE("Producing and receiving tuples via MQTT", "[MQTT]") {
//prepare the consuming
typedef TuplePtr<int, double> InTuplePtr;
int resCntr = 0;
PFabricContext ctx;
auto t = ctx.createTopology();
auto s = t->newStreamFromMQTT("tcp://localhost:1883", "test_topic")
.extract<InTuplePtr>(',')
.notify([&resCntr](auto tp, bool outdated) { resCntr++; })
;
//start the producing
mqtt::async_client client("tcp://localhost:1883", "producerID");
mqtt::connect_options conopts;
mqtt::token_ptr conntok = client.connect(conopts);
conntok->wait();
for(auto i=0; i<100; i++) {
mqtt::message_ptr pubmsg = mqtt::make_message("test_topic", std::to_string(i)+",1.5");
pubmsg->set_qos(1);
client.publish(pubmsg)->wait_for(std::chrono::seconds(10));
}
conntok = client.disconnect();
conntok->wait();
//start the consuming
t->start(false);
REQUIRE(resCntr == 100);
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment