Commit 6b1616e7 authored by Kai-Uwe Sattler's avatar Kai-Uwe Sattler
Browse files

fix for crash in partitioned join

parent 98a0ebf9
......@@ -3,5 +3,6 @@ include(../../cmake/Testing.cmake.in)
if (BUILD_BENCHMARKS)
if (BUILD_GOOGLE_BENCH)
do_test(TopologyBenchmarks)
do_test(MemoryBenchmarks)
endif()
endif()
......@@ -229,9 +229,9 @@ BENCHMARK(TopologyPartitionedGroupByTest);
*Testing method six: partitioned join
*ERROR while testing: "double free or corruption (out)"
*/
/*void TopologyPartitionedJoinTest(benchmark::State& state) {
void TopologyPartitionedJoinTest(benchmark::State& state) {
typedef TuplePtr<Tuple<int, std::string, double> > T1;
typedef TuplePtr<int, std::string, double> T1;
TestDataGenerator tgen1("file1.csv");
tgen1.writeData(100);
......@@ -251,8 +251,8 @@ BENCHMARK(TopologyPartitionedGroupByTest);
.join<int>(s1, [](auto tp1, auto tp2) { return true; })
.merge();
t.start(false);
t.start();
t.wait();
//BAD: Takes far too long because of iteration number
//wait for results - stop timer
//state.PauseTiming();
......@@ -260,7 +260,7 @@ BENCHMARK(TopologyPartitionedGroupByTest);
//state.ResumeTiming();
}
}
BENCHMARK(TopologyPartitionedJoinTest);*/
BENCHMARK(TopologyPartitionedJoinTest);
//MAIN for benchmark tests
BENCHMARK_MAIN();
......@@ -54,6 +54,7 @@
#include "qop/TumblingWindow.hpp"
#include "qop/TupleDeserializer.hpp"
#include "qop/TupleExtractor.hpp"
#include "qop/Tuplifier.hpp"
#include "qop/Where.hpp"
#include "qop/ZMQSink.hpp"
......@@ -171,13 +172,11 @@ class Pipe {
return dataflow->addPublisher(op);
}
template <typename JoinOp, typename OtherSource>
template <typename T2, typename KeyType>
OpIterator addPartitionedJoin(
std::vector<std::shared_ptr<JoinOp>>& opList, OtherSource* otherOp,
std::vector<std::shared_ptr<SHJoin<T, T2, KeyType>>>& opList, DataSource<T2>* otherOp,
PartitioningState otherPartitioningState) throw(TopologyException) {
typedef typename PartitionBy<T>::InputDataChannel InputDataChannel;
typedef typename PartitionBy<T>::InputPunctuationChannel
InputPunctuationChannel;
typedef typename std::shared_ptr<SHJoin<T, T2, KeyType>> JoinOpPtr;
if (partitioningState == NoPartitioning)
throw TopologyException("Missing partitionBy operator in topology.");
......@@ -185,14 +184,14 @@ class Pipe {
auto partition = castOperator<PartitionBy<T>>(getPublisher());
for (auto i = 0u; i < opList.size(); i++) {
auto op = opList[i];
// auto op = opList[i];
JoinOpPtr op = opList[i];
// connect to left input channels
partition->connectChannelsForPartition(
i,
reinterpret_cast<InputDataChannel&>(op->getLeftInputDataChannel()),
reinterpret_cast<InputPunctuationChannel&>(
op->getInputPunctuationChannel()));
op->getLeftInputDataChannel(),
op->getInputPunctuationChannel());
// connect to right input channels
if (otherPartitioningState == NoPartitioning) {
connectChannels(otherOp->getOutputDataChannel(),
......@@ -833,6 +832,26 @@ class Pipe {
}
}
template <typename Tout>
Pipe<Tout> tuplify(const std::initializer_list<std::string>& predList, TuplifierParams::TuplifyMode m,
unsigned int ws = 0) throw(TopologyException) {
if (partitioningState == NoPartitioning) {
auto op = std::make_shared<Tuplifier<T, Tout>>(predList, m, ws);
auto iter = addPublisher<Tuplifier<T, Tout>, DataSource<T>>(op);
return Pipe<Tout>(dataflow, iter, keyExtractor, timestampExtractor,
partitioningState, numPartitions);
} else {
std::vector<std::shared_ptr<Tuplifier<T, Tout>>> ops;
for (auto i = 0u; i < numPartitions; i++) {
ops.push_back(std::make_shared<Tuplifier<T, Tout>>(predList, m, ws));
}
auto iter = addPartitionedPublisher<Tuplifier<T, Tout>, T>(ops);
return Pipe<Tout>(dataflow, iter, keyExtractor, timestampExtractor,
partitioningState, numPartitions);
}
}
/**
* @brief Creates a stateful map operator.
*
......@@ -1224,7 +1243,7 @@ class Pipe {
auto op = std::make_shared<SHJoin<T, T2, KeyType>>(fn1, fn2, pred);
ops.push_back(op);
}
auto iter = addPartitionedJoin<SHJoin<T, T2, KeyType>, DataSource<T2>>(
auto iter = addPartitionedJoin<T2, KeyType>(
ops, otherOp, otherPipe.partitioningState);
return Pipe<Tout>(dataflow, iter, keyExtractor, timestampExtractor,
partitioningState, numPartitions);
......@@ -1381,7 +1400,7 @@ class Pipe {
auto pOp = castOperator<DataSource<T>>(iter->get());
CREATE_LINK(pOp, op);
}
auto iter = dataflow->addPublisher(op);
dataflow->addPublisher(op);
// return Pipe(dataflow, iter, keyExtractor, timestampExtractor,
// partitioningState, numPartitions);
......
......@@ -126,8 +126,9 @@ public:
* @param dataChannel the input data channel of the operator associated with this partition
* @param punctuationChannel the input punctuation channel of the operator
*/
void connectChannelsForPartition(PartitionID id, InputDataChannel& dataChannel,
InputPunctuationChannel& punctuationChannel) {
template <typename DataChannel, typename PunctuationChannel>
void connectChannelsForPartition(PartitionID id, DataChannel& dataChannel,
PunctuationChannel& punctuationChannel) {
BOOST_ASSERT_MSG(id >= 0 && id < mNumPartitions, "invalid partition id");
// we decouple the channels by introducing a Queue operator which
// runs the consumer side within a separate thread
......
......@@ -204,7 +204,7 @@ namespace pfabric {
* @param punctuation the incoming punctuation tuple
*/
void processPunctuation( const PunctuationPtr& punctuation ) {
if (punctuation != nullptr)
if (punctuation != nullptr)
mQueue.push(std::make_tuple(punctuation->ptype(), nullptr, false));
}
......
......@@ -34,6 +34,19 @@
#include "qop/UnaryTransform.hpp"
namespace pfabric {
struct TuplifierParams {
/**
* An enumeration to specify the tuplifying mode
*/
enum TuplifyMode {
ORDERED, //< we assume triples arrive ordered on the subject
WINDOW, //< we maintain a time window and publish all tuples (including
//incomplete tuples) if they are outdated
PUNCTUATED, //< at a punctuation we publish all tuples received so far
COMPLETED //< as soon as a tuple is complete, we publish it
};
};
/**
* @brief this class provides an operator to transform a set of primtive tuples
* (triples)
......@@ -60,18 +73,7 @@ class Tuplifier
: public UnaryTransform<InputStreamElement, OutputStreamElement> {
PFABRIC_UNARY_TRANSFORM_TYPEDEFS(InputStreamElement, OutputStreamElement)
public:
/**
* An enumeration to specify the tuplifying mode
*/
enum TuplifyMode {
ORDERED, //< we assume triples arrive ordered on the subject
WINDOW, //< we maintain a time window and publish all tuples (including
//incomplete tuples) if they are outdated
PUNCTUATED, //< at a punctuation we publish all tuples received so far
COMPLETED //< as soon as a tuple is complete, we publish it
};
/**
/**
* A typedef for predicates list
*/
typedef std::vector<std::string> PredicateList;
......@@ -86,7 +88,7 @@ class Tuplifier
* @param m the tuplifying mode
* @param ws a window size for periodic notification (default = 0)
*/
Tuplifier(const std::initializer_list<std::string>& predList, TuplifyMode m, unsigned int ws = 0)
Tuplifier(const std::initializer_list<std::string>& predList, TuplifierParams::TuplifyMode m, unsigned int ws = 0)
: mode(m),
currentSubj(),
notifier(
......@@ -102,7 +104,7 @@ class Tuplifier
}
Tuplifier(TimestampExtractorFunc func,
const std::initializer_list<std::string>& predList, TuplifyMode m, unsigned int ws = 0) :
const std::initializer_list<std::string>& predList, TuplifierParams::TuplifyMode m, unsigned int ws = 0) :
Tuplifier(predList, m, ws),
mTimestampExtractor(func) {}
......@@ -141,7 +143,7 @@ class Tuplifier
*/
void processDataElement(const InputStreamElement& data,
const bool outdated = false) {
if (mode == ORDERED) {
if (mode == TuplifierParams::ORDERED) {
const std::string& subj = get<0>(*data);
if (currentSubj.empty() || subj == currentSubj) {
// add triple to buffer
......@@ -158,7 +160,7 @@ class Tuplifier
// just add to tuple to the buffer
addToBuffer(data);
if (mode == COMPLETED) {
if (mode == TuplifierParams::COMPLETED) {
// we try to publish all completed tuples
produceCompleteTuples();
}
......@@ -176,7 +178,7 @@ class Tuplifier
* (outdated == true)
*/
void processPunctuation(const PunctuationPtr& pp) {
if (mode == ORDERED) {
if (mode == TuplifierParams::ORDERED) {
produceTupleForSubject(currentSubj);
} else {
produceAllTuples();
......@@ -235,7 +237,7 @@ class Tuplifier
it->second.matches++;
it->second.tripleList.push_back(data);
} else {
const Timestamp ts = this->mTimestampExtractor(data);
const Timestamp ts = this->mTimestampExtractor == nullptr ? 0 : this->mTimestampExtractor(data);
BufferItem item(ts);
item.tripleList.push_back(data);
......@@ -310,7 +312,7 @@ class Tuplifier
tupleBuffer; //< a buffer for all received triples not yet published
PredicateMap predicates; //< a map containing all predicates and their
//position in the tuple
TuplifyMode mode; //< the mode for constructing tuples from triples
TuplifierParams::TuplifyMode mode; //< the mode for constructing tuples from triples
std::string currentSubj; //< the current subject in the triple stream (only
//useful for ordered)
std::unique_ptr<TriggerNotifier> notifier; //< the notifier object which
......
......@@ -42,6 +42,8 @@ if (BUILD_TEST_CASES)
do_test(PartitionTest)
do_test(BarrierTest)
do_test(StreamGeneratorTest)
do_test(TuplifierTest)
do_test(BPTreeTest)
if (USE_ROCKSDB_TABLE)
do_test(RocksDBTest)
......
......@@ -311,3 +311,22 @@ TEST_CASE("Combining tuples from two streams to one stream", "[ToStream]") {
REQUIRE(results == 200);
}
TEST_CASE("Tuplifying a stream of RDF strings", "[Tuplifier]") {
typedef TuplePtr<std::string, std::string, std::string> Triple;
typedef TuplePtr<std::string, std::string, std::string, std::string> RDFTuple;
std::vector<RDFTuple> results;
std::mutex r_mutex;
Topology t;
auto s = t.newStreamFromFile(std::string(TEST_DATA_DIRECTORY) + "tuplifier_test1.in")
.extract<Triple>(',')
.tuplify<RDFTuple>({ "http://data.org/name", "http://data.org/price", "http://data.org/someOther" },
TuplifierParams::ORDERED)
.notify([&](auto tp, bool outdated) {
std::lock_guard<std::mutex> lock(r_mutex);
results.push_back(tp);
});
t.start(false);
REQUIRE(results.size() == 3);
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment