Commit 98ffd2a1 authored by Philipp Götze's avatar Philipp Götze
Browse files

Added possibility to synchronize ZMQ PubSub operators

parent 9fb72577
......@@ -26,9 +26,9 @@
using namespace pfabric;
using namespace pfabric::sock;
ZMQSocket::ZMQSocket(const std::string& path, int type, short value, size_t len,
ZMQSocket::ZMQSocket(const std::string& path, const std::string& syncPath, int type, short value, size_t len,
const std::string& name) :
mSocketName(name), mSocketPath(path), mSocketType(type), value(value) {
mSocketName(name), mSocketPath(path), mSocketSyncPath(syncPath), mSocketType(type), value(value) {
mCtxPtr = new zmq::context_t(1);
configureSocket(len);
}
......@@ -60,6 +60,8 @@ void ZMQSocket::setSocketType(int socket_type) {
ZMQSocket::~ZMQSocket() {
if (mZMQSockPtr)
delete mZMQSockPtr;
if (mSocketSyncPath != "")
delete mZMQSyncSockPtr;
if (mCtxPtr)
delete mCtxPtr;
}
......@@ -68,11 +70,18 @@ void ZMQSocket::configureSocket(size_t len) {
mZMQSockPtr = new zmq::socket_t(*mCtxPtr, mSocketType);
const char *path = mSocketPath.c_str();
switch (mSocketType) {
// Fall through cases!
case ZMQ_PULL:
case ZMQ_PUB:
case ZMQ_REP:
mZMQSockPtr->setsockopt(ZMQ_SNDHWM, &value, sizeof(int));
mZMQSockPtr->bind(path);
if(mSocketSyncPath != "") {
mZMQSyncSockPtr = new zmq::socket_t(*mCtxPtr, ZMQ_REP);
mZMQSyncSockPtr->bind(mSocketSyncPath.c_str());
zmq::message_t message(0);
mZMQSyncSockPtr->recv(message, zmq::recv_flags::none);
}
break;
case ZMQ_SUB:
//mZMQSockPtr->setsockopt(ZMQ_RCVHWM, &value, sizeof(int));
......@@ -80,6 +89,12 @@ void ZMQSocket::configureSocket(size_t len) {
case ZMQ_PUSH:
case ZMQ_REQ:
mZMQSockPtr->connect(path);
if(mSocketSyncPath != "") {
mZMQSyncSockPtr = new zmq::socket_t(*mCtxPtr, ZMQ_REQ);
mZMQSyncSockPtr->connect(mSocketSyncPath.c_str());
zmq::message_t message(0);
mZMQSyncSockPtr->send(message, zmq::send_flags::none);
}
}
int timeout = 2000;
mZMQSockPtr->setsockopt(ZMQ_RCVTIMEO, &timeout, sizeof(timeout));
......@@ -197,6 +212,7 @@ zmq::message_t& ZMQSocket::recvMessage(bool blocking) {
int retval = -1;
const auto flags = (blocking == false) ? zmq::recv_flags::dontwait : zmq::recv_flags::none;
std::cout << "Receiving ...\n";
if (mZMQSockPtr != NULL) {
try {
retval = mZMQSockPtr->recv(message, flags).value_or(-1);
......
......@@ -85,6 +85,10 @@ private:
* the path of the socket where the communication will be done
*/
std::string mSocketPath;
/**
* the path of the socket where the synchronization will be done
*/
std::string mSocketSyncPath;
/**
* socket type whether publisher ZMQ_PUB, subscriber ZMQ_SUB, requester ZMQ_REQ or replier ZMQ_REP
*/
......@@ -93,6 +97,10 @@ private:
* the created socket
*/
zmq::socket_t * mZMQSockPtr;
/**
* the created sync socket (possibly nullptr)
*/
zmq::socket_t * mZMQSyncSockPtr;
/**
* socket context
*/
......@@ -108,7 +116,7 @@ public:
/**
* constructor to create the socket according to its path and type
*/
ZMQSocket(const std::string& path, int type, short value = '\0', size_t len = 0,
ZMQSocket(const std::string& path, const std::string& synPath, int type, short value = '\0', size_t len = 0,
const std::string& name = "");
/**
* destructor
......
......@@ -52,15 +52,15 @@ namespace pfabric {
* @param mode the encoding mode for messages (binary, ascii, ...)
* @param tlen the (optional) length of the tuple (in bytes) used for allocating a buffer
*/
ZMQSink(const std::string& path, ZMQParams::SinkType stype = ZMQParams::PublisherSink,
ZMQSink(const std::string& path, const std::string& syncPath = "", ZMQParams::SinkType stype = ZMQParams::PublisherSink,
ZMQParams::EncodingMode mode = ZMQParams::BinaryMode, unsigned int tlen = 1024) :
mMode(mode), mSinkType(stype) {
if (mSinkType == ZMQParams::PublisherSink) {
mSocket = std::make_unique< sock::ZMQSocket >(path, ZMQ_PUB);
mSocket = std::make_unique< sock::ZMQSocket >(path, syncPath, ZMQ_PUB);
// mSync = new ZMQSinkSync(path);
}
else if (mSinkType == ZMQParams::PushSink)
mSocket = std::make_unique< sock::ZMQSocket >(path, ZMQ_PUSH);
mSocket = std::make_unique< sock::ZMQSocket >(path, syncPath, ZMQ_PUSH);
if (mMode == ZMQParams::BinaryMode)
mBuf.resize(tlen);
}
......@@ -81,7 +81,7 @@ namespace pfabric {
/**
* @brief Bind the callback for the punctuation channel.
*/
BIND_INPUT_CHANNEL_DEFAULT( InputPunctuationChannel, ZMQSink, processPunctuation );
BIND_INPUT_CHANNEL_DEFAULT( InputPunctuationChannel, ZMQSink, processPunctuation );
private:
......
......@@ -17,7 +17,6 @@
* along with PipeFabric. If not, see <http://www.gnu.org/licenses/>.
*/
#include "ZMQSource.hpp"
#include <cassert>
......@@ -26,20 +25,21 @@ using namespace pfabric;
ZMQSourceImpl::ZMQSourceImpl(const std::string& path,
ZMQParams::SourceType stype,
ZMQParams::EncodingMode emode,
TStringCallbackFunc const& cb1,
BufCallbackFunc const& cb2,
PunctuationCallbackFunc const& cb3 ) :
mTStringCB(cb1), mBufCB(cb2), mPunctuationCB(cb3),
mNumTuples(0), mMode(emode), mInterrupted(true), /*mSourceThread(nullptr),*/ mType(stype) {
const std::string& syncPath,
ZMQParams::SourceType stype,
ZMQParams::EncodingMode emode,
TStringCallbackFunc const& cb1,
BufCallbackFunc const& cb2,
PunctuationCallbackFunc const& cb3 ) :
mTStringCB(cb1), mBufCB(cb2), mPunctuationCB(cb3),
mNumTuples(0), mMode(emode), mInterrupted(true), /*mSourceThread(nullptr),*/ mType(stype) {
switch (mType) {
case ZMQParams::SubscriberSource:
mSocket = std::make_unique< sock::ZMQSocket >(path, ZMQ_SUB);
mSocket = std::make_unique< sock::ZMQSocket >(path, syncPath, ZMQ_SUB);
break;
case ZMQParams::PullSource:
mSocket = std::make_unique< sock::ZMQSocket >(path, ZMQ_PULL);
mSocket = std::make_unique< sock::ZMQSocket >(path, syncPath, ZMQ_PULL);
break;
default:
BOOST_ASSERT_MSG(false, "unsupported source type for ZMQSource");
......@@ -51,6 +51,7 @@ unsigned long ZMQSourceImpl::process() {
mInterrupted = false;
StringRef result;
while (mInterrupted == false) {
try {
if (mMode == ZMQParams::AsciiMode) {
......@@ -60,14 +61,14 @@ unsigned long ZMQSourceImpl::process() {
}
}
else {
zmq::message_t& msg = mSocket->recvMessage();
if (msg.size() != 0) {
uint8_t *ptr = (uint8_t *) msg.data();
StreamType buf;
buf.assign(ptr, ptr + msg.size());
auto tp = makeTuplePtr((const StreamType&) buf);
mBufCB(tp);
}
zmq::message_t& msg = mSocket->recvMessage();
if (msg.size() != 0) {
uint8_t *ptr = (uint8_t *) msg.data();
StreamType buf;
buf.assign(ptr, ptr + msg.size());
auto tp = makeTuplePtr((const StreamType&) buf);
mBufCB(tp);
}
}
}
catch (sock::ZMQSocketException& exc) {
......@@ -95,7 +96,6 @@ void ZMQSourceImpl::start() {
if (mInterrupted == true) {
std::unique_lock<std::mutex> sync_lock(mStartMtx);
mSourceThread = std::thread(&ZMQSourceImpl::process, this);
}
}
......@@ -103,7 +103,6 @@ void ZMQSourceImpl::start() {
void ZMQSourceImpl::stop() {
// synchronize this method to block until the thread is stopped
std::lock_guard<std::mutex> lock_gd(mZmqMtx);
if (mInterrupted == false) {
mInterrupted = true;
try {
......@@ -121,18 +120,18 @@ bool ZMQSourceImpl::isInterrupted() const {
/*
void ZMQSourceImpl::sync() {
std::string test;
std::string path = this->mSocket->getSocketPath();
std::string::size_type pos = path.find_last_of(':');
int port = atoi(path.substr(pos + 1).c_str());
port += 1;
std::string sync_address = path.substr(0, pos);
std::stringstream sync_path;
sync_path << sync_address << ":"<< port;
std::cout << "try to sync in zmq_source: " << sync_path.str() << std::endl;
sock::ZMQSocket* the_socket = new sock::ZMQSocket(sync_path.str(), ZMQ_REQ);
the_socket->sendString("");
the_socket->recvString(test);
delete the_socket;
std::string test;
std::string path = this->mSocket->getSocketPath();
std::string::size_type pos = path.find_last_of(':');
int port = atoi(path.substr(pos + 1).c_str());
port += 1;
std::string sync_address = path.substr(0, pos);
std::stringstream sync_path;
sync_path << sync_address << ":"<< port;
std::cout << "try to sync in zmq_source: " << sync_path.str() << std::endl;
sock::ZMQSocket* the_socket = new sock::ZMQSocket(sync_path.str(), ZMQ_REQ);
the_socket->sendString("");
the_socket->recvString(test);
delete the_socket;
}
*/
......@@ -56,69 +56,71 @@ public:
typedef std::function< void(TBufPtr) > BufCallbackFunc;
typedef std::function< void(PunctuationPtr) > PunctuationCallbackFunc;
/**
* Constructor for a ZMQSource implementation to assign a path for the socket,
/**
* Constructor for a ZMQSource implementation to assign a path for the socket,
* the type of source, and the encoding.
*
* @param path the socket path where we receive the tuples
* @param stype the type of 0MQ source (pull or sink)
* @param emode the type of encoding (binary or ascii)
* @param cb1 a callback function for receiving string tuples
* @param cb2 a callback function for receiving serialized tuples
* @param cb3 a callback function for receiving punctuations
*/
* @param path the socket path where we receive the tuples
* @param syncPath the socket path where we synchronize sub and pub
* @param stype the type of 0MQ source (pull or sink)
* @param emode the type of encoding (binary or ascii)
* @param cb1 a callback function for receiving string tuples
* @param cb2 a callback function for receiving serialized tuples
* @param cb3 a callback function for receiving punctuations
*/
ZMQSourceImpl(const std::string& path,
const std::string& syncPath,
ZMQParams::SourceType stype,
ZMQParams::EncodingMode emode,
TStringCallbackFunc const& cb1,
TStringCallbackFunc const& cb1,
BufCallbackFunc const& cb2,
PunctuationCallbackFunc const& cb3);
/**
* Destructor for releasing resources.
*/
~ZMQSourceImpl();
/**
* Destructor for releasing resources.
*/
~ZMQSourceImpl();
/**
* Start the processing.
*/
void start();
/**
* Start the processing.
*/
void start();
/**
* Stop the processing.
*/
void stop();
/**
* Stop the processing.
*/
void stop();
/**
* Check whether the processing was interrupted or not.
/**
* Check whether the processing was interrupted or not.
*
* @return true if processing was interrupted
*/
bool isInterrupted() const;
*/
bool isInterrupted() const;
private:
/**
* Try to receive and process incoming tuples.
/**
* Try to receive and process incoming tuples.
*
* @return number of received tuples
*/
unsigned long process();
* @return number of received tuples
*/
unsigned long process();
typedef std::unique_ptr< sock::ZMQSocket > ZMQSocketPtr;
TStringCallbackFunc mTStringCB; //<
BufCallbackFunc mBufCB; //<
PunctuationCallbackFunc mPunctuationCB; //<
TStringCallbackFunc mTStringCB; //<
BufCallbackFunc mBufCB; //<
PunctuationCallbackFunc mPunctuationCB; //<
ZMQSocketPtr mSocket; //< the subscriber socket
int mNumTuples; //< number tuple processed by the socket
ZMQParams::EncodingMode mMode; //< the encoding mode
bool mInterrupted; //< a flag for interrupting
ZMQSocketPtr mSocket; //< the subscriber socket
int mNumTuples; //< number tuple processed by the socket
ZMQParams::EncodingMode mMode; //< the encoding mode
bool mInterrupted; //< a flag for interrupting
std::thread mSourceThread; //< the socket reader thread
mutable std::mutex mZmqMtx; //<
mutable std::mutex mStartMtx; //<
ZMQParams::SourceType mType; //<
mutable std::mutex mZmqMtx; //<
mutable std::mutex mStartMtx; //<
ZMQParams::SourceType mType; //<
};
/**
......@@ -129,36 +131,36 @@ private:
* @tparam Tout
* the data stream element type which is produced by the source
*/
template<typename Tout>
class ZMQSourceBase : public DataSource<Tout> {
template<typename Tout>
class ZMQSourceBase : public DataSource<Tout> {
PFABRIC_SOURCE_TYPEDEFS(Tout)
public:
/**
* Constructor to create a ZMQSourceBase object delegating the
public:
/**
* Constructor to create a ZMQSourceBase object delegating the
* actual processing to a ZMQSourceImpl instance.
*
* @param path the socket path where we receive the tuples
* @param stype the type of 0MQ source (pull or sink)
* @param emode the type of encoding (binary or ascii)
* @param stype the type of 0MQ source (pull or sink)
* @param emode the type of encoding (binary or ascii)
* @param cb1 a callback function for receiving string tuples
* @param cb2 a callback function for receiving serialized tuples
*/
ZMQSourceBase(const std::string& path,
ZMQParams::SourceType stype = ZMQParams::SubscriberSource,
ZMQParams::EncodingMode emode = ZMQParams::BinaryMode,
ZMQSourceImpl::TStringCallbackFunc const& cb1 = nullptr,
ZMQSourceImpl::BufCallbackFunc const& cb2 = nullptr) :
mImpl(new ZMQSourceImpl( path, stype, emode,
cb1, cb2,
std::bind(&ZMQSourceBase::publishPunctuation, this, std::placeholders::_1))) {}
*/
ZMQSourceBase(const std::string& path,
const std::string& syncPath,
ZMQParams::SourceType stype = ZMQParams::SubscriberSource,
ZMQParams::EncodingMode emode = ZMQParams::BinaryMode,
ZMQSourceImpl::TStringCallbackFunc const& cb1 = nullptr,
ZMQSourceImpl::BufCallbackFunc const& cb2 = nullptr) :
mImpl(new ZMQSourceImpl( path, syncPath, stype, emode, cb1, cb2,
std::bind(&ZMQSourceBase::publishPunctuation, this, std::placeholders::_1))) {}
/**
* Stop the processing.
*/
void stop() {
mImpl->stop();
}
void stop() {
mImpl->stop();
}
/**
* Start the processing.
......@@ -170,7 +172,7 @@ private:
return 0;
}
protected:
protected:
/**
* Produce and forward a punctuation tuple. This method is used as callback
......@@ -178,13 +180,13 @@ private:
*
* @param pp the punctuation tuple
*/
void publishPunctuation(PunctuationPtr pp) {
this->getOutputPunctuationChannel().publish(pp);
}
void publishPunctuation(PunctuationPtr pp) {
this->getOutputPunctuationChannel().publish(pp);
}
private:
std::unique_ptr< ZMQSourceImpl > mImpl; //< pointer to the actual implementation
};
private:
std::unique_ptr< ZMQSourceImpl > mImpl; //< pointer to the actual implementation
};
/**
* ZMQSource is a source operator for receiving tuples via 0MQ and produce
......@@ -212,8 +214,8 @@ private:
* @param path the socket path where we receive the tuples
* @param stype the type of 0MQ source (pull or sink)
*/
ZMQSource(const std::string& path, ZMQParams::SourceType stype = ZMQParams::SubscriberSource) :
ZMQSourceBase<TBufPtr>(path, stype, ZMQParams::BinaryMode,
ZMQSource(const std::string& path, const std::string& syncPath = "", ZMQParams::SourceType stype = ZMQParams::SubscriberSource) :
ZMQSourceBase<TBufPtr>(path, syncPath, stype, ZMQParams::BinaryMode,
nullptr,
std::bind(&ZMQSource<TBufPtr>::publishTuple, this, std::placeholders::_1)) {}
......@@ -224,7 +226,7 @@ private:
* @param tp a serialized tuple that is published
*/
void publishTuple(TBufPtr tp) {
this->getOutputDataChannel().publish(tp, false);
this->getOutputDataChannel().publish(tp, false);
}
......@@ -245,8 +247,8 @@ private:
* @param path the socket path where we receive the tuples
* @param stype the type of 0MQ source (pull or sink)
*/
ZMQSource(const std::string& path, ZMQParams::SourceType stype = ZMQParams::SubscriberSource) :
ZMQSourceBase<TStringPtr>(path, stype, ZMQParams::AsciiMode,
ZMQSource(const std::string& path, const std::string& syncPath = "", ZMQParams::SourceType stype = ZMQParams::SubscriberSource) :
ZMQSourceBase<TStringPtr>(path, syncPath, stype, ZMQParams::AsciiMode,
std::bind(&ZMQSource<TStringPtr>::publishTuple, this, std::placeholders::_1),
nullptr) {}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment