Commit 3e688fd9 authored by Philipp Götze's avatar Philipp Götze
Browse files

Shortened Test cases waiting period

parent ed462b1c
......@@ -87,7 +87,7 @@ TEST_CASE("Controlling stream processing by a barrier", "[Barrier]") {
// => only tuples 1, 2, 3, 4 should arrive
mockup->start();
std::this_thread::sleep_for(1s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(mockup->numTuplesProcessed() == 4);
// now set counter to 13:
......@@ -95,7 +95,7 @@ TEST_CASE("Controlling stream processing by a barrier", "[Barrier]") {
mockup->addExpected({makeTuplePtr(11), makeTuplePtr(12)});
counter.set(13);
std::this_thread::sleep_for(1s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(mockup->numTuplesProcessed() == 6);
// set counter to 25:
......@@ -103,6 +103,6 @@ TEST_CASE("Controlling stream processing by a barrier", "[Barrier]") {
mockup->addExpected({makeTuplePtr(20), makeTuplePtr(21), makeTuplePtr(22)});
counter.set(25);
std::this_thread::sleep_for(1s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(mockup->numTuplesProcessed() == 9);
}
......@@ -66,8 +66,8 @@ TEST_CASE("Producing a data stream from inserts into a table", "[FromTable]") {
testTable->insert(i, *tp);
}
using namespace std::chrono_literals;
std::this_thread::sleep_for(2s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(mockup->numTuplesProcessed() == 10);
testTable->drop();
}
......@@ -99,7 +99,7 @@ TEST_CASE("Partitioning a data stream and merging the results.",
mockup->start();
using namespace std::chrono_literals;
std::this_thread::sleep_for(1s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(mockup->numTuplesProcessed() == numTuples / 2);
}
......@@ -57,8 +57,7 @@ TEST_CASE("Decoupling producer and consumer via a queue", "[Queue]") {
mockup->start();
using namespace std::chrono_literals;
std::this_thread::sleep_for(2s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(mockup->numTuplesProcessed() == expected.size());
}
......@@ -69,7 +69,7 @@ TEST_CASE("Receiving data via REST", "[RESTSource]" ) {
// note we have to start the REST server asynchronously
auto handle = std::async(std::launch::async, [&](std::shared_ptr<RESTSource> src){ src->start(); }, restSource);
std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
HttpClient client("localhost:8099");
for (int i = 0; i < 100; i++) {
std::string param_string = fmt::format("(\"key\": \"{0}\",\"value\": \"Always the same\")", i);
......
......@@ -94,7 +94,7 @@ TEST_CASE("Building and running a topology with partitioned aggregation",
t.start(false);
std::this_thread::sleep_for(2s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(results.size() == num);
}
......
......@@ -102,7 +102,7 @@ TEST_CASE("Building and running a topology with simple unpartitioned grouping",
t.start(false);
std::this_thread::sleep_for(2s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(results.size() == num);
......@@ -159,7 +159,7 @@ TEST_CASE("Building and running a topology with unpartitioned grouping",
t.start(false);
std::this_thread::sleep_for(2s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(results.size() == num);
......@@ -228,7 +228,7 @@ TEST_CASE("Building and running a topology with partitioned grouping",
t.start(false);
std::this_thread::sleep_for(2s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(results.size() == num);
......
......@@ -58,9 +58,8 @@ TEST_CASE("Building and running a topology with ScaleJoin (3 instances)", "[Scal
t.prepare();
t.start(false);
while(results!=num) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(results == num);
}
......@@ -136,12 +135,12 @@ TEST_CASE("Building and running a topology with joins on different tuple formats
}
//Symmetric Hash Join with the left data stream partitioned
TEST_CASE("Building and running a topology with a join on one partitioned stream",
TEST_CASE("Building and running a topology with a join on one partitioned stream",
"[Partitioned and unpartitioned Join]") {
typedef TuplePtr<unsigned long, unsigned long> MyTuplePtr;
StreamGenerator<MyTuplePtr>::Generator streamGen ([](unsigned long n) -> MyTuplePtr {
return makeTuplePtr(n, n % 100);
return makeTuplePtr(n, n % 100);
});
unsigned long num = 1000;
......@@ -168,7 +167,7 @@ TEST_CASE("Building and running a topology with a join on one partitioned stream
t.start(false);
std::this_thread::sleep_for(1s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(results.size() == num*10);
......@@ -210,7 +209,7 @@ TEST_CASE("Building and running a topology with a join on another partitioned st
t.start(false);
std::this_thread::sleep_for(1s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(results.size() == num*10);
......@@ -251,7 +250,7 @@ TEST_CASE("Building and running a topology with a join on another partitioned st
t.start(false);
std::this_thread::sleep_for(1s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(results.size() == num*10);
......@@ -266,7 +265,7 @@ TEST_CASE("Building and running a topology with a join on two partitioned stream
typedef TuplePtr<unsigned long, unsigned long> MyTuplePtr;
StreamGenerator<MyTuplePtr>::Generator streamGen ([](unsigned long n) -> MyTuplePtr {
return makeTuplePtr(n, n % 100);
return makeTuplePtr(n, n % 100);
});
unsigned long num = 1000;
......@@ -294,7 +293,7 @@ TEST_CASE("Building and running a topology with a join on two partitioned stream
t.start(false);
std::this_thread::sleep_for(1s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(results.size() == num*10);
......
......@@ -82,8 +82,7 @@ TEST_CASE("Building and running a topology with ZMQ", "[Topology]") {
t.start(false);
using namespace std::chrono_literals;
std::this_thread::sleep_for(1s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
auto handle = std::async(std::launch::async, [&publisher](){
std::vector<std::string> input = {
......@@ -97,7 +96,7 @@ TEST_CASE("Building and running a topology with ZMQ", "[Topology]") {
});
handle.get();
std::this_thread::sleep_for(2s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::string expected = "0,10\n1,11\n2,12\n3,13\n4,14\n5,15\n";
......@@ -155,10 +154,9 @@ TEST_CASE("Building and running a topology with partitioning", "[Topology]") {
results.push_back(v);
});
t.start();
t.start(false);
using namespace std::chrono_literals;
std::this_thread::sleep_for(2s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(results.size() == 500);
......@@ -225,8 +223,7 @@ TEST_CASE("Building and running a topology with batcher", "[Topology]") {
t2.start(false);
using namespace std::chrono_literals;
std::this_thread::sleep_for(2s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(procBatchCount == 100);
REQUIRE(procTupleCount == 1000);
......@@ -368,11 +365,9 @@ TEST_CASE("Combining tuples from two streams to one stream", "[ToStream]") {
results++;
});
t.start();
t.wait();
t.start(false);
using namespace std::chrono_literals;
std::this_thread::sleep_for(2s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(results == 200);
}
......
......@@ -63,15 +63,14 @@ TEST_CASE("Transfer a binary tuple stream via ZMQ", "[ZMQSource][ZMQSink]") {
CREATE_DATA_LINK(src, deserializer);
CREATE_DATA_LINK(deserializer, mockup);
using namespace std::chrono_literals;
//std::this_thread::sleep_for(2s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
auto handle = std::async(std::launch::async, [&mockup](){
mockup->start();
});
handle.get();
std::this_thread::sleep_for(2s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
src->stop();
REQUIRE(mockup->numTuplesProcessed() == numTuples);
}
......@@ -46,7 +46,7 @@ TEST_CASE("Receiving a ascii tuple stream via ZMQSource", "[ZMQSource]") {
typedef ConsoleWriter< MyTuplePtr > TestWriter;
zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind("tcp://*:5678");
auto src = std::make_shared< TestZMQSource > ("tcp://localhost:5678");
......@@ -60,22 +60,21 @@ TEST_CASE("Receiving a ascii tuple stream via ZMQSource", "[ZMQSource]") {
auto writer = std::make_shared< TestWriter >(strm, formatter);
CREATE_DATA_LINK(extractor, writer);
using namespace std::chrono_literals;
std::this_thread::sleep_for(1s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
auto handle = std::async(std::launch::async, [&publisher](){
std::vector<std::string> input = {
"0|10", "1|11", "2|12", "3|13", "4|14", "5|15"
};
for(const std::string &s : input) {
zmq::message_t request (4);
memcpy (request.data (), s.c_str(), 4);
zmq::message_t request(s.length());
memcpy(request.data(), s.c_str(), s.length());
publisher.send(request, zmq::send_flags::none);
}
});
handle.get();
std::this_thread::sleep_for(2s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
src->stop();
std::string expected = "0,10\n1,11\n2,12\n3,13\n4,14\n5,15\n";
REQUIRE(strm.str() == expected);
......@@ -104,8 +103,7 @@ TEST_CASE("Receiving a binary tuple stream via ZMQSource", "[ZMQSource]") {
auto writer = std::make_shared< TestWriter >(strm, formatter);
CREATE_DATA_LINK(extractor, writer);
using namespace std::chrono_literals;
std::this_thread::sleep_for(1s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
auto handle = std::async(std::launch::async, [&publisher](){
std::vector<MyTuplePtr> input = {
......@@ -126,7 +124,7 @@ TEST_CASE("Receiving a binary tuple stream via ZMQSource", "[ZMQSource]") {
});
handle.get();
std::this_thread::sleep_for(2s);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
src->stop();
std::string expected = "0,10\n1,11\n2,12\n3,13\n4,14\n5,15\n";
REQUIRE(strm.str() == expected);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment