Commit ebc58ac9 authored by Philipp Götze's avatar Philipp Götze
Browse files

Adapted ZMQ parts for version 4.3.1 and newer

parent 191639ca
......@@ -5,7 +5,7 @@ PipeFabric relies on several open source components which have to be installed b
+ [CMake](https://cmake.org/) 3.2 (or newer) build environment
+ [Boost](http://www.boost.org/) 1.61 (or newer) C++ libraries (build all libraries)
+ [ZeroMQ](http://zeromq.org/) socket library (including [zmq.hpp](https://github.com/zeromq/cppzmq/blob/master/zmq.hpp))
+ [ZeroMQ](http://zeromq.org/) 4.3.1 (or newer) socket library (including [zmq.hpp](https://github.com/zeromq/cppzmq/blob/master/zmq.hpp))
+ JeMalloc or TCMalloc library (optional)
There are some additional 3rd party libraries such as [Catch](https://github.com/philsquared/Catch) for testing,
......
......@@ -17,7 +17,7 @@
* along with PipeFabric. If not, see <http://www.gnu.org/licenses/>.
*/
#include <iostream>
#include "ZMQSocket.hpp"
......@@ -27,34 +27,34 @@ using namespace pfabric;
using namespace pfabric::sock;
ZMQSocket::ZMQSocket(const std::string& path, int type, short value, size_t len,
const std::string& name) :
mSocketName(name), mSocketPath(path), mSocketType(type), value(value) {
mCtxPtr = new zmq::context_t(1);
configureSocket(len);
}
const std::string& name) :
mSocketName(name), mSocketPath(path), mSocketType(type), value(value) {
mCtxPtr = new zmq::context_t(1);
configureSocket(len);
}
const std::string& ZMQSocket::getSocketName() const {
return mSocketName;
return mSocketName;
}
void ZMQSocket::setSocketName(const std::string& gate_name) {
this->mSocketName = gate_name;
this->mSocketName = gate_name;
}
const std::string& ZMQSocket::getSocketPath() const {
return mSocketPath;
return mSocketPath;
}
void ZMQSocket::setSocketPath(const std::string& socket_path) {
this->mSocketPath = socket_path;
this->mSocketPath = socket_path;
}
int ZMQSocket::getSocketType() const {
return mSocketType;
return mSocketType;
}
void ZMQSocket::setSocketType(int socket_type) {
this->mSocketType = socket_type;
this->mSocketType = socket_type;
}
ZMQSocket::~ZMQSocket() {
......@@ -65,22 +65,22 @@ ZMQSocket::~ZMQSocket() {
}
void ZMQSocket::configureSocket(size_t len) {
mZMQSockPtr = new zmq::socket_t(*mCtxPtr, mSocketType);
const char *path = mSocketPath.c_str();
switch (mSocketType) {
case ZMQ_PULL:
case ZMQ_PUB:
case ZMQ_REP:
mZMQSockPtr->setsockopt(ZMQ_SNDHWM, &value, sizeof(int));
mZMQSockPtr->bind(path);
break;
case ZMQ_SUB:
//mZMQSockPtr->setsockopt(ZMQ_RCVHWM, &value, sizeof(int));
mZMQSockPtr->setsockopt(ZMQ_SUBSCRIBE, &value, len);
case ZMQ_PUSH:
case ZMQ_REQ:
mZMQSockPtr->connect(path);
}
mZMQSockPtr = new zmq::socket_t(*mCtxPtr, mSocketType);
const char *path = mSocketPath.c_str();
switch (mSocketType) {
case ZMQ_PULL:
case ZMQ_PUB:
case ZMQ_REP:
mZMQSockPtr->setsockopt(ZMQ_SNDHWM, &value, sizeof(int));
mZMQSockPtr->bind(path);
break;
case ZMQ_SUB:
//mZMQSockPtr->setsockopt(ZMQ_RCVHWM, &value, sizeof(int));
mZMQSockPtr->setsockopt(ZMQ_SUBSCRIBE, &value, len);
case ZMQ_PUSH:
case ZMQ_REQ:
mZMQSockPtr->connect(path);
}
int timeout = 2000;
mZMQSockPtr->setsockopt(ZMQ_RCVTIMEO, &timeout, sizeof(timeout));
int linger = 0;
......@@ -88,138 +88,138 @@ void ZMQSocket::configureSocket(size_t len) {
}
void* ZMQSocket::getContext() {
return (void *) (mCtxPtr);
return (void *) (mCtxPtr);
}
bool ZMQSocket::sendString(const std::string& str) {
int ZMQSocket::sendString(const std::string& str) {
#ifndef ZERO_COPY
/**
* This initialisation will generate duplicated messages in the new versions
*/
zmq::message_t msg((void *) str.c_str(), str.length(), NULL); //
/**
* This initialisation will generate duplicated messages in the new versions
*/
zmq::message_t msg((void *) str.c_str(), str.length(), NULL); //
#else
/**
* the best solution to prevent duplicated messages
*/
zmq::message_t msg(str.size());
memcpy(msg.data(), str.data(), str.size());
/**
* the best solution to prevent duplicated messages
*/
zmq::message_t msg(str.size());
memcpy(msg.data(), str.data(), str.size());
#endif
//nanosleep(&tim, NULL);
//message.rebuild((void *) str.c_str(), str.length(), NULL); this also will generate duplicated messages in the new versions
return mZMQSockPtr->send(msg);
//nanosleep(&tim, NULL);
//message.rebuild((void *) str.c_str(), str.length(), NULL); this also will generate duplicated messages in the new versions
return mZMQSockPtr->send(msg, zmq::send_flags::none).value_or(-1);
}
bool ZMQSocket::sendBuffer(const std::vector<uint8_t>& buf) {
int ZMQSocket::sendBuffer(const std::vector<uint8_t>& buf) {
#ifndef ZERO_COPY
zmq::message_t msg((void *) buf.data(), buf.size(), NULL);
zmq::message_t msg((void *) buf.data(), buf.size(), NULL);
#else
zmq::message_t msg(buf.size());
memcpy(msg.data(), buf.data(), buf.size());
zmq::message_t msg(buf.size());
memcpy(msg.data(), buf.data(), buf.size());
#endif
return mZMQSockPtr->send(msg);
return mZMQSockPtr->send(msg, zmq::send_flags::none).value_or(-1);
}
bool ZMQSocket::sendBuffer(char *buf, int len) {
/**
* Here the same
*/
int ZMQSocket::sendBuffer(char *buf, int len) {
/**
* Here the same
*/
#ifndef ZERO_COPY
zmq::message_t msg((void *) buf, len, NULL);
zmq::message_t msg((void *) buf, len, NULL);
#else
zmq::message_t msg(len);
memcpy(msg.data(), buf, len);
zmq::message_t msg(len);
memcpy(msg.data(), buf, len);
#endif
//message.rebuild((void *) buf, len, NULL);
return mZMQSockPtr->send(msg);
//message.rebuild((void *) buf, len, NULL);
return mZMQSockPtr->send(msg, zmq::send_flags::none).value_or(-1);
}
int ZMQSocket::recvString(StringRef& data, bool blocking) {
int retval = -1;
int flags = (blocking == false) ? ZMQ_NOBLOCK : 0;
int retval = -1;
const auto flags = (blocking == false) ? zmq::recv_flags::dontwait : zmq::recv_flags::none;
if (mZMQSockPtr != NULL) {
if (mZMQSockPtr != NULL) {
try {
retval = mZMQSockPtr->recv(&message, flags);
}
catch (zmq::error_t & ex) {
try {
retval = mZMQSockPtr->recv(message, flags).value_or(-1);
}
catch (zmq::error_t & ex) {
if(ex.num() != ETERM)
throw;
return -1;
}
if (retval != 1) {
retval = -1;
std::string error("Failed to receive zeromq message: ");
error.append(zmq_strerror(errno));
throw ZMQSocketException(error);
}
retval = message.size();
if (retval > 0) {
// avoid to copy message string
data.setValues(static_cast<char*>(message.data()), retval);
}
}
return retval;
if (retval != 1) {
retval = -1;
std::string error("Failed to receive zeromq message: ");
error.append(zmq_strerror(errno));
throw ZMQSocketException(error);
}
retval = message.size();
if (retval > 0) {
// avoid to copy message string
data.setValues(static_cast<char*>(message.data()), retval);
}
}
return retval;
}
int ZMQSocket::recvBuffer(char *buf, bool blocking) {
int retval = -1;
int flags = (blocking == false) ? ZMQ_NOBLOCK : 0;
if (mZMQSockPtr != NULL) {
try {
retval = mZMQSockPtr->recv(&message, flags);
}
catch (zmq::error_t & exp) {
//std::cout << "error : " << exp.what() << std::endl;
throw ;
}
if (retval != 1) {
retval = -1;
std::string error("Failed to receive zeromq message: ");
error.append(zmq_strerror(errno));
throw ZMQSocketException(error);
}
retval = message.size();
if (retval > 0) {
memcpy((void*) buf, message.data(), retval);
}
}
return retval;
int retval = -1;
const auto flags = (blocking == false) ? zmq::recv_flags::dontwait : zmq::recv_flags::none;
if (mZMQSockPtr != NULL) {
try {
retval = mZMQSockPtr->recv(message, flags).value_or(-1);
}
catch (zmq::error_t & exp) {
//std::cout << "error : " << exp.what() << std::endl;
throw ;
}
if (retval != 1) {
retval = -1;
std::string error("Failed to receive zeromq message: ");
error.append(zmq_strerror(errno));
throw ZMQSocketException(error);
}
retval = message.size();
if (retval > 0) {
memcpy((void*) buf, message.data(), retval);
}
}
return retval;
}
void ZMQSocket::closeSocket() {
mZMQSockPtr->close();
mZMQSockPtr->close();
}
zmq::message_t& ZMQSocket::recvMessage(bool blocking) {
int retval = -1;
int flags = (blocking == false) ? ZMQ_NOBLOCK : 0;
int retval = -1;
const auto flags = (blocking == false) ? zmq::recv_flags::dontwait : zmq::recv_flags::none;
if (mZMQSockPtr != NULL) {
try {
retval = mZMQSockPtr->recv(&message, flags);
}
catch (zmq::error_t & exp) {
//std::cout << "error : " << exp.what() << std::endl;
throw ;
}
if (retval != 1) {
retval = -1;
std::string error("Failed to receive zeromq message: ");
error.append(zmq_strerror(errno));
throw ZMQSocketException(error);
}
}
if (mZMQSockPtr != NULL) {
try {
retval = mZMQSockPtr->recv(message, flags).value_or(-1);
}
catch (zmq::error_t & exp) {
//std::cout << "error : " << exp.what() << std::endl;
throw ;
}
if (retval != 1) {
retval = -1;
std::string error("Failed to receive zeromq message: ");
error.append(zmq_strerror(errno));
throw ZMQSocketException(error);
}
}
return message;
return message;
}
void ZMQSocket::connect(const std::string& path) {
assert(mSocketType == ZMQ_SUB);
mZMQSockPtr->setsockopt(ZMQ_SUBSCRIBE, "", 0);
mZMQSockPtr->connect(path.c_str());
assert(mSocketType == ZMQ_SUB);
mZMQSockPtr->setsockopt(ZMQ_SUBSCRIBE, "", 0);
mZMQSockPtr->connect(path.c_str());
}
......@@ -153,7 +153,7 @@ public:
* send a string by this socket
* @param string a string to be sent
*/
bool sendString(const std::string &string);
int sendString(const std::string &string);
/**
* receive a string from this socket
* @param string a string to store the result
......@@ -164,9 +164,9 @@ public:
* @param buf a buffer to be sent
* @param len the length of the buffer
*/
bool sendBuffer(char *buf, int len);
int sendBuffer(char *buf, int len);
bool sendBuffer(const std::vector<uint8_t>& buf);
int sendBuffer(const std::vector<uint8_t>& buf);
/**
* configure (create) the socket according to its path and type
......
......@@ -64,7 +64,7 @@ TEST_CASE("Transfer a binary tuple stream via ZMQ", "[ZMQSource][ZMQSink]") {
CREATE_DATA_LINK(deserializer, mockup);
using namespace std::chrono_literals;
std::this_thread::sleep_for(2s);
//std::this_thread::sleep_for(2s);
auto handle = std::async(std::launch::async, [&mockup](){
mockup->start();
......
......@@ -70,7 +70,7 @@ TEST_CASE("Receiving a ascii tuple stream via ZMQSource", "[ZMQSource]") {
for(const std::string &s : input) {
zmq::message_t request (4);
memcpy (request.data (), s.c_str(), 4);
publisher.send (request);
publisher.send(request, zmq::send_flags::none);
}
});
......@@ -121,7 +121,7 @@ TEST_CASE("Receiving a binary tuple stream via ZMQSource", "[ZMQSource]") {
tp->serializeToStream(res);
zmq::message_t request (res.size());
memcpy (request.data (), res.data(), res.size());
publisher.send (request);
publisher.send(request, zmq::send_flags::none);
}
});
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment