Commit 14cff085 authored by Philipp Götze's avatar Philipp Götze
Browse files

Removed most heuristic sleeps

parent 569c94a8
Pipeline #184 failed with stages
in 46 minutes and 29 seconds
...@@ -19,7 +19,7 @@ endmacro(build_executable) ...@@ -19,7 +19,7 @@ endmacro(build_executable)
macro(build_catch_test arg) macro(build_catch_test arg)
include_directories("${PROJECT_SOURCE_DIR}/test") include_directories("${PROJECT_SOURCE_DIR}/test")
add_executable( ${arg} "${arg}.cpp" $<TARGET_OBJECTS:TestMain>) add_executable( ${arg} "${arg}.cpp" $<TARGET_OBJECTS:TestMain>)
target_link_libraries( ${arg} target_link_libraries( ${arg}
pfabric_core pfabric_core
${Boost_SYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY}
${ROCKSDB_LIB} ${ROCKSDB_LIB}
......
...@@ -17,40 +17,40 @@ ...@@ -17,40 +17,40 @@
* along with PipeFabric. If not, see <http://www.gnu.org/licenses/>. * along with PipeFabric. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "Dataflow.hpp" #include "Dataflow.hpp"
using namespace pfabric; using namespace pfabric;
Dataflow::BaseOpIterator Dataflow::addPublisher(Dataflow::BaseOpPtr op) { Dataflow::BaseOpIterator Dataflow::addPublisher(Dataflow::BaseOpPtr op) {
publishers.push_back(op); publishers.push_back(op);
auto iter = publishers.end(); auto iter = publishers.end();
return --iter; return --iter;
} }
Dataflow::BaseOpIterator Dataflow::addPublisherList(const Dataflow::BaseOpList& lst) { Dataflow::BaseOpIterator Dataflow::addPublisherList(const Dataflow::BaseOpList& lst) {
publishers.insert(publishers.end(), lst.begin(), lst.end()); publishers.insert(publishers.end(), lst.begin(), lst.end());
auto iter = publishers.end(); auto iter = publishers.end();
std::advance(iter, -lst.size()); std::advance(iter, -lst.size());
return iter; return iter;
} }
void Dataflow::addSink(Dataflow::BaseOpPtr op) { sinks.push_back(op); } void Dataflow::addSink(Dataflow::BaseOpPtr op) { sinks.push_back(op); }
/** /**
* @brief Returns the operator at the end of the publisher list. * @brief Returns the operator at the end of the publisher list.
* *
* Returns the operator which acts as the publisher for the next * Returns the operator which acts as the publisher for the next
* added operator. * added operator.
* *
* @return * @return
* the last operator in the publisher list * the last operator in the publisher list
*/ */
Dataflow::BaseOpPtr Dataflow::getPublisher() { return publishers.back(); } Dataflow::BaseOpPtr Dataflow::getPublisher() { return publishers.back(); }
Dataflow::BaseOpIterator Dataflow::getPublishers(unsigned int num) { Dataflow::BaseOpIterator Dataflow::getPublishers(unsigned int num) {
auto iter = publishers.end(); auto iter = publishers.end();
std::advance(iter, -num); std::advance(iter, -num);
return iter; return iter;
} }
std::size_t Dataflow::size() const { return publishers.size(); } std::size_t Dataflow::size() const { return publishers.size(); }
...@@ -17,10 +17,7 @@ ...@@ -17,10 +17,7 @@
* along with PipeFabric. If not, see <http://www.gnu.org/licenses/>. * along with PipeFabric. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <boost/thread.hpp>
#include <boost/chrono.hpp>
#include "Topology.hpp" #include "Topology.hpp"
#include "qop/ZMQSource.hpp"
using namespace pfabric; using namespace pfabric;
...@@ -74,14 +71,21 @@ void Topology::prepare() { ...@@ -74,14 +71,21 @@ void Topology::prepare() {
} }
} }
void Topology::wait() { void Topology::wait(const std::chrono::milliseconds &dur) {
if (!asyncStarted) if (!asyncStarted)
return; return;
std::lock_guard<std::mutex> guard(mMutex); std::lock_guard<std::mutex> guard(mMutex);
// let's wait until the function finished // let's wait until the function finished
for(auto &f : startupFutures) for(auto &f : startupFutures)
f.get(); f.wait();
//TODO: wait for EndOfStream Punctuations on all sinks
//TODO: what about merging streams or no actual sinks?
std::unique_lock<std::mutex> lk(mCv_m);
const auto now = std::chrono::system_clock::now();
if (mCv.wait_until(lk, now + dur) == std::cv_status::timeout) {
//Timeout!
}
} }
void Topology::runEvery(unsigned long secs) { void Topology::runEvery(unsigned long secs) {
...@@ -159,14 +163,16 @@ Pipe<TStringPtr> Topology::newStreamFromREST(unsigned int port, ...@@ -159,14 +163,16 @@ Pipe<TStringPtr> Topology::newStreamFromREST(unsigned int port,
return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op)); return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
} }
Pipe<TStringPtr> Topology::newAsciiStreamFromZMQ(const std::string& path, Pipe<TStringPtr> Topology::newAsciiStreamFromZMQ(const std::string& path, const std::string& syncPath,
ZMQParams::SourceType stype) { ZMQParams::SourceType stype) {
auto op = std::make_shared<ZMQSource<TStringPtr> >(path, stype); auto op = std::make_shared<ZMQSource<TStringPtr> >(path, syncPath, stype);
registerStartupFunction(std::bind(&ZMQSource<TStringPtr>::start, op.get()));
return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op)); return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
} }
Pipe<TBufPtr> Topology::newBinaryStreamFromZMQ(const std::string& path, Pipe<TBufPtr> Topology::newBinaryStreamFromZMQ(const std::string& path, const std::string& syncPath,
ZMQParams::SourceType stype) { ZMQParams::SourceType stype) {
auto op = std::make_shared<ZMQSource<TBufPtr> >(path, stype); auto op = std::make_shared<ZMQSource<TBufPtr> >(path, syncPath, stype);
registerStartupFunction(std::bind(&ZMQSource<TBufPtr>::start, op.get()));
return Pipe<TBufPtr>(dataflow, dataflow->addPublisher(op)); return Pipe<TBufPtr>(dataflow, dataflow->addPublisher(op));
} }
...@@ -20,12 +20,15 @@ ...@@ -20,12 +20,15 @@
#ifndef Topology_hpp_ #ifndef Topology_hpp_
#define Topology_hpp_ #define Topology_hpp_
#include <chrono>
#include <condition_variable>
#include <string> #include <string>
#include <list> #include <list>
#include <vector> #include <vector>
#include <future> #include <future>
#include <mutex> #include <mutex>
#include <chrono>
#include <boost/chrono.hpp>
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include "core/Tuple.hpp" #include "core/Tuple.hpp"
...@@ -102,6 +105,8 @@ namespace pfabric { ...@@ -102,6 +105,8 @@ namespace pfabric {
std::vector<std::future<unsigned long> > startupFutures; //< futures for the startup functions std::vector<std::future<unsigned long> > startupFutures; //< futures for the startup functions
std::vector<boost::thread> wakeupTimers; //< interruptible threads for runEvery queries std::vector<boost::thread> wakeupTimers; //< interruptible threads for runEvery queries
std::mutex mMutex; //< mutex for accessing startupFutures std::mutex mMutex; //< mutex for accessing startupFutures
std::condition_variable mCv; //< condition variable to check if sinks have received EndOfStream
std::mutex mCv_m;
DataflowPtr dataflow; DataflowPtr dataflow;
...@@ -174,7 +179,7 @@ namespace pfabric { ...@@ -174,7 +179,7 @@ namespace pfabric {
* If the topology was started asynchronously the call of wait() * If the topology was started asynchronously the call of wait()
* blocks until the execution stopped. * blocks until the execution stopped.
*/ */
void wait(); void wait(const std::chrono::milliseconds &dur = 500ms);
/** /**
* @brief Creates a pipe from a TextFileSource as input. * @brief Creates a pipe from a TextFileSource as input.
...@@ -289,9 +294,11 @@ namespace pfabric { ...@@ -289,9 +294,11 @@ namespace pfabric {
* a new pipe where ZMQSource acts as a producer. * a new pipe where ZMQSource acts as a producer.
*/ */
Pipe<TStringPtr> newAsciiStreamFromZMQ(const std::string& path, Pipe<TStringPtr> newAsciiStreamFromZMQ(const std::string& path,
const std::string& syncPath = "",
ZMQParams::SourceType stype = ZMQParams::SubscriberSource); ZMQParams::SourceType stype = ZMQParams::SubscriberSource);
Pipe<TBufPtr> newBinaryStreamFromZMQ(const std::string& path, Pipe<TBufPtr> newBinaryStreamFromZMQ(const std::string& path,
const std::string& syncPath = "",
ZMQParams::SourceType stype = ZMQParams::SubscriberSource); ZMQParams::SourceType stype = ZMQParams::SubscriberSource);
/** /**
......
...@@ -86,8 +86,7 @@ TEST_CASE("Controlling stream processing by a barrier", "[Barrier]") { ...@@ -86,8 +86,7 @@ TEST_CASE("Controlling stream processing by a barrier", "[Barrier]") {
// set counter to 10 and send tuples 1, 2, 3, 4, 11, 12: // set counter to 10 and send tuples 1, 2, 3, 4, 11, 12:
// => only tuples 1, 2, 3, 4 should arrive // => only tuples 1, 2, 3, 4 should arrive
mockup->start(); mockup->start();
mockup->wait();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(mockup->numTuplesProcessed() == 4); REQUIRE(mockup->numTuplesProcessed() == 4);
// now set counter to 13: // now set counter to 13:
...@@ -95,7 +94,7 @@ TEST_CASE("Controlling stream processing by a barrier", "[Barrier]") { ...@@ -95,7 +94,7 @@ TEST_CASE("Controlling stream processing by a barrier", "[Barrier]") {
mockup->addExpected({makeTuplePtr(11), makeTuplePtr(12)}); mockup->addExpected({makeTuplePtr(11), makeTuplePtr(12)});
counter.set(13); counter.set(13);
std::this_thread::sleep_for(std::chrono::milliseconds(100)); mockup->wait();
REQUIRE(mockup->numTuplesProcessed() == 6); REQUIRE(mockup->numTuplesProcessed() == 6);
// set counter to 25: // set counter to 25:
...@@ -103,6 +102,6 @@ TEST_CASE("Controlling stream processing by a barrier", "[Barrier]") { ...@@ -103,6 +102,6 @@ TEST_CASE("Controlling stream processing by a barrier", "[Barrier]") {
mockup->addExpected({makeTuplePtr(20), makeTuplePtr(21), makeTuplePtr(22)}); mockup->addExpected({makeTuplePtr(20), makeTuplePtr(21), makeTuplePtr(22)});
counter.set(25); counter.set(25);
std::this_thread::sleep_for(std::chrono::milliseconds(100)); mockup->wait();
REQUIRE(mockup->numTuplesProcessed() == 9); REQUIRE(mockup->numTuplesProcessed() == 9);
} }
...@@ -55,8 +55,8 @@ TEST_CASE("Stream from matrix", "[FromMatrixTest]") ...@@ -55,8 +55,8 @@ TEST_CASE("Stream from matrix", "[FromMatrixTest]")
for(auto &tuple : inputs) { for(auto &tuple : inputs) {
matrix->insert(tuple); matrix->insert(tuple);
} }
using namespace std::chrono_literals;
std::this_thread::sleep_for(2s); mockup->wait(2000ms);
REQUIRE(mockup->numTuplesProcessed() == size); REQUIRE(mockup->numTuplesProcessed() == size);
} }
...@@ -66,8 +66,7 @@ TEST_CASE("Producing a data stream from inserts into a table", "[FromTable]") { ...@@ -66,8 +66,7 @@ TEST_CASE("Producing a data stream from inserts into a table", "[FromTable]") {
testTable->insert(i, *tp); testTable->insert(i, *tp);
} }
std::this_thread::sleep_for(std::chrono::milliseconds(100)); mockup->wait();
REQUIRE(mockup->numTuplesProcessed() == 10); REQUIRE(mockup->numTuplesProcessed() == 10);
testTable->drop(); testTable->drop();
} }
...@@ -97,9 +97,7 @@ TEST_CASE("Partitioning a data stream and merging the results.", ...@@ -97,9 +97,7 @@ TEST_CASE("Partitioning a data stream and merging the results.",
CREATE_DATA_LINK(merge, mockup); CREATE_DATA_LINK(merge, mockup);
mockup->start(); mockup->start();
mockup->wait();
using namespace std::chrono_literals;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(mockup->numTuplesProcessed() == numTuples / 2); REQUIRE(mockup->numTuplesProcessed() == numTuples / 2);
} }
...@@ -56,8 +56,7 @@ TEST_CASE("Decoupling producer and consumer via a queue", "[Queue]") { ...@@ -56,8 +56,7 @@ TEST_CASE("Decoupling producer and consumer via a queue", "[Queue]") {
CREATE_DATA_LINK(ch, mockup) CREATE_DATA_LINK(ch, mockup)
mockup->start(); mockup->start();
mockup->wait();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(mockup->numTuplesProcessed() == expected.size()); REQUIRE(mockup->numTuplesProcessed() == expected.size());
} }
...@@ -17,65 +17,47 @@ ...@@ -17,65 +17,47 @@
* along with PipeFabric. If not, see <http://www.gnu.org/licenses/>. * along with PipeFabric. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <boost/filesystem.hpp>
#include <iostream>
#include <future> #include <future>
// using namespace boost::filesystem;
#include "catch.hpp" #include "catch.hpp"
#include "fmt/format.h" #include "fmt/format.h"
#include "SimpleWeb/client_http.hpp" #include "SimpleWeb/client_http.hpp"
#include "core/Tuple.hpp"
#include "qop/RESTSource.hpp" #include "qop/RESTSource.hpp"
#include "qop/DataSink.hpp" #include "StreamMockup.hpp"
#include "qop/OperatorMacros.hpp"
using namespace pfabric; using namespace pfabric;
using namespace ns_types; using namespace ns_types;
class TestConsumer : public SynchronizedDataSink<TStringPtr> { using HttpClient = SimpleWeb::Client<SimpleWeb::HTTP>;
public:
PFABRIC_SYNC_SINK_TYPEDEFS(TStringPtr)
TestConsumer() : tupleNum(0) {}
BIND_INPUT_CHANNEL_DEFAULT(InputDataChannel, TestConsumer, processDataElement);
BIND_INPUT_CHANNEL_DEFAULT(InputPunctuationChannel, TestConsumer, processPunctuation);
void processPunctuation(const PunctuationPtr& punctuation) {}
void processDataElement( const TStringPtr& data, const bool outdated ) {
std::string input (data->getAttribute<0>().begin_, data->getAttribute<0>().size_);
std::string expected = fmt::format("(\"key\": \"{0}\",\"value\": \"Always the same\")", tupleNum);
REQUIRE(input == expected); TEST_CASE("Receiving data via REST", "[RESTSource]" ) {
tupleNum++; constexpr auto numTuples = 1000;
std::vector<TStringPtr> expected;
std::vector<std::string> stringVec;
for (int i = 0; i < numTuples; i++) {
auto param_string = fmt::format("(\"key\": \"{0}\",\"value\": \"Always the same\")", i);
stringVec.push_back(std::move(param_string));
expected.push_back(makeTuplePtr(StringRef(stringVec[i].c_str(), stringVec[i].size())));
} }
private:
int tupleNum;
};
typedef SimpleWeb::Client<SimpleWeb::HTTP> HttpClient;
TEST_CASE("Receiving data via REST", "[RESTSource]" ) {
auto restSource = std::make_shared<RESTSource>(8099, "^/publish$", RESTSource::POST_METHOD); auto restSource = std::make_shared<RESTSource>(8099, "^/publish$", RESTSource::POST_METHOD);
auto consumer = std::make_shared<TestConsumer>(); auto mockup = std::make_shared<StreamMockup<TStringPtr, TStringPtr> >(expected, expected);
CREATE_LINK(restSource, consumer); CREATE_LINK(restSource, mockup);
// note we have to start the REST server asynchronously /// NOTE: we have to start the REST server asynchronously
auto handle = std::async(std::launch::async, [&](std::shared_ptr<RESTSource> src){ src->start(); }, restSource); auto handle = std::async(std::launch::async, [&restSource, &stringVec](){
restSource->start();
});
std::this_thread::sleep_for(std::chrono::milliseconds(100));
HttpClient client("localhost:8099"); HttpClient client("localhost:8099");
for (int i = 0; i < 100; i++) { for (int i = 0; i < numTuples; i++) {
std::string param_string = fmt::format("(\"key\": \"{0}\",\"value\": \"Always the same\")", i); auto res = client.request("POST", "/publish", stringVec[i]);
auto res = client.request("POST", "/publish", param_string);
} }
restSource->stop(); restSource->stop();
handle.get(); handle.get();
REQUIRE(mockup->numTuplesProcessed() == numTuples);
} }
...@@ -25,9 +25,11 @@ ...@@ -25,9 +25,11 @@
#include <boost/algorithm/string/find_iterator.hpp> #include <boost/algorithm/string/find_iterator.hpp>
#include <boost/algorithm/string.hpp> #include <boost/algorithm/string.hpp>
#include <vector> #include <chrono>
#include <mutex> #include <condition_variable>
#include <fstream> #include <fstream>
#include <mutex>
#include <vector>
#include "core/Tuple.hpp" #include "core/Tuple.hpp"
#include "qop/Map.hpp" #include "qop/Map.hpp"
...@@ -36,157 +38,171 @@ ...@@ -36,157 +38,171 @@
#include "qop/OperatorMacros.hpp" #include "qop/OperatorMacros.hpp"
using namespace std::chrono_literals;
namespace pfabric { namespace pfabric {
struct MockupHelper { struct MockupHelper {
template <typename StreamElement> template <typename StreamElement>
static void readTuplesFromStream(std::ifstream& in, std::vector<StreamElement>& vec) { static void readTuplesFromStream(std::ifstream& in, std::vector<StreamElement>& vec) {
std::string line; std::string line;
std::vector<std::string> data(StreamElementTraits<StreamElement>::NUM_ATTRIBUTES); std::vector<std::string> data(StreamElementTraits<StreamElement>::NUM_ATTRIBUTES);
while (getline(in, line)) { while (getline(in, line)) {
int i = 0; int i = 0;
for (boost::split_iterator<std::string::iterator> it = for (boost::split_iterator<std::string::iterator> it =
boost::make_split_iterator(line, boost::first_finder(",", boost::is_iequal())); boost::make_split_iterator(line, boost::first_finder(",", boost::is_iequal()));
it != boost::split_iterator<std::string::iterator>(); it++) { it != boost::split_iterator<std::string::iterator>(); it++) {
data[i++] = boost::copy_range<std::string>(*it); data[i++] = boost::copy_range<std::string>(*it);
} }
auto tp = StreamElementTraits<StreamElement>::create(data); auto tp = StreamElementTraits<StreamElement>::create(data);
vec.push_back(tp); vec.push_back(tp);
} }
} }
}; };
template< template<
typename InputStreamElement, typename InputStreamElement,
typename OutputStreamElement> typename OutputStreamElement>
class StreamMockup : class StreamMockup :
public DataSource< InputStreamElement >, public DataSource< InputStreamElement >,
public SynchronizedDataSink< OutputStreamElement > public SynchronizedDataSink< OutputStreamElement >
{ {
public: public:
typedef DataSource< InputStreamElement > SourceBase; typedef DataSource< InputStreamElement > SourceBase;
typedef typename SourceBase::OutputDataChannel OutputDataChannel; typedef typename SourceBase::OutputDataChannel OutputDataChannel;
typedef typename SourceBase::OutputPunctuationChannel OutputPunctuationChannel; typedef typename SourceBase::OutputPunctuationChannel OutputPunctuationChannel;
typedef typename SourceBase::OutputDataElementTraits OutputDataElementTraits; typedef typename SourceBase::OutputDataElementTraits OutputDataElementTraits;
typedef SynchronizedDataSink< OutputStreamElement > SinkBase; typedef SynchronizedDataSink< OutputStreamElement > SinkBase;
typedef typename SinkBase::InputDataChannel InputDataChannel; typedef typename SinkBase::InputDataChannel InputDataChannel;
typedef typename SinkBase::InputPunctuationChannel InputPunctuationChannel; typedef typename SinkBase::InputPunctuationChannel InputPunctuationChannel;
typedef typename SinkBase::InputDataElementTraits InputDataElementTraits; typedef typename SinkBase::InputDataElementTraits InputDataElementTraits;
typedef std::function<bool(const OutputStreamElement& lhs, const OutputStreamElement& rhs)> CompareFunc; typedef std::function<bool(const OutputStreamElement& lhs, const OutputStreamElement& rhs)> CompareFunc;
/** /**
* *
* @param input * @param input
* @param expected * @param expected
* @param ordered * @param ordered
* @param cFunc * @param cFunc
*/ */
StreamMockup(const std::vector<InputStreamElement>& input, StreamMockup(const std::vector<InputStreamElement>& input,
const std::vector<OutputStreamElement>& expected, bool ordered = true, CompareFunc cFunc = nullptr) : const std::vector<OutputStreamElement>& expected, bool ordered = true, CompareFunc cFunc = nullptr) :
inputTuples(input), expectedTuples(expected), tuplesProcessed(0), compareOrdered(ordered), inputTuples(input), expectedTuples(expected), tuplesProcessed(0), compareOrdered(ordered),
compareFunc(cFunc) { compareFunc(cFunc) {
if (!compareOrdered) if (!compareOrdered)
BOOST_ASSERT_MSG(compareFunc != nullptr, "no comparison predicated given."); BOOST_ASSERT_MSG(compareFunc != nullptr, "no comparison predicated given.");
} }
StreamMockup(const std::string& inputStream, const std::string& expectedStream) StreamMockup(const std::string& inputStream, const std::string& expectedStream)
: tuplesProcessed(0), compareOrdered(true) { : tuplesProcessed(0), compareOrdered(true) {
auto inputFile = std::string(TEST_DATA_DIRECTORY) + inputStream; auto inputFile = std::string(TEST_DATA_DIRECTORY) + inputStream;
std::ifstream input(inputFile); std::ifstream input(inputFile);
REQUIRE(input.is_open()); REQUIRE(input.is_open());
auto expectedFile = TEST_DATA_DIRECTORY + expectedStream; auto expectedFile = TEST_DATA_DIRECTORY + expectedStream;
std::ifstream expected(expectedFile); std::ifstream expected(expectedFile);
REQUIRE(expected.is_open()); REQUIRE(expected.is_open());
MockupHelper::readTuplesFromStream<InputStreamElement>(input, inputTuples); MockupHelper::readTuplesFromStream<InputStreamElement>(input, inputTuples);
MockupHelper::readTuplesFromStream<OutputStreamElement>(expected, expectedTuples); MockupHelper::readTuplesFromStream<OutputStreamElement>(expected, expectedTuples);
} }
void start() { void start() {
const bool outdated = false;