Commit 83010507 authored by Constantin Pohl's avatar Constantin Pohl
Browse files

added ScaleJoin operator with test case, removed unnecessary locks from SHJoin

parent a5b588ee
......@@ -57,6 +57,7 @@
#include "qop/Tuplifier.hpp"
#include "qop/Where.hpp"
#include "qop/ZMQSink.hpp"
#include "qop/ScaleJoin.hpp"
namespace pfabric {
......@@ -1257,6 +1258,101 @@ class Pipe {
}
}
/**
* @brief Creates an operator for joining two streams represented by pipes.
* Origin idea & paper: "ScaleJoin: a Deterministic, Disjoint-Parallel and
* Skew-Resilient Stream Join" (2016)
*
* Creates an operator implementing a ScaleJoin to join two streams.
* In addition a join predicate can be specified. Note, that the output
* tuple type is derived from the two input types.
*
* @tparam T
* the input tuple type (usually a TuplePtr) of the left stream.
* @tparam T2
* the input tuple type (usually a TuplePtr) of the right stream.
* @tparam KeyType
* the data type for representing keys (join values)
* @param[in] otherPipe
* the pipe representing the right stream
* @param[in] pred
* the join predicate
* @param[in] threadnum
* the number of threads for parallel joining
* @return a new pipe
*/
template <typename KeyType = DefaultKeyType, typename T2>
Pipe<typename ScaleJoin<T, T2, KeyType>::ResultElement> scaleJoin(
Pipe<T2>& otherPipe, typename ScaleJoin<T, T2, KeyType>::JoinPredicateFunc pred, const int threadnum)
throw(TopologyException) {
typedef typename ScaleJoin<T, T2, KeyType>::ResultElement Tout;
try {
typedef std::function<KeyType(const T&)> LKeyExtractorFunc;
typedef std::function<KeyType(const T2&)> RKeyExtractorFunc;
//specify the keys of tuples
LKeyExtractorFunc fn1 = boost::any_cast<LKeyExtractorFunc>(keyExtractor);
RKeyExtractorFunc fn2 = boost::any_cast<RKeyExtractorFunc>(otherPipe.keyExtractor);
//get the sources of tuples of last operator before scaleJoin-operator (left and right stream)
auto pOp = castOperator<DataSource<T> >(getPublisher());
auto otherOp = castOperator<DataSource<T2> >(otherPipe.getPublisher());
//partitioning not necessary, already multithreaded join
assert(partitioningState == NoPartitioning);
assert(otherPipe.partitioningState == NoPartitioning);
assert(threadnum > 0);
//vector for join operators as well as queues (multithreading encoupling)
std::vector<std::shared_ptr<ScaleJoin<T, T2, KeyType> > > scJoinVec;
std::vector<std::shared_ptr<Queue<T> > > scQueueVec;
//queue for collecting join results, forwarding as a single stream
auto combine = std::make_shared<Queue<Tout>>();
//start thread instances, specified by threadnum
for (auto i=0; i<threadnum; i++) {
//create queue and scaleJoin instances
auto qu = std::make_shared<Queue<T>>();
auto scJoin = std::make_shared<ScaleJoin<T, T2, KeyType> >(fn1, fn2, pred, i, threadnum);
//connect output of predecessing operator of left stream with input of the current queue instance
CREATE_LINK(pOp, qu);
//connect output of queue instance with input of scaleJoin instance
connectChannels(qu->getOutputDataChannel(), scJoin->getLeftInputDataChannel());
connectChannels(qu->getOutputPunctuationChannel(), scJoin->getInputPunctuationChannel());
//connect output of predecessing operator of right stream with input of scaleJoin instance
connectChannels(otherOp->getOutputDataChannel(), scJoin->getRightInputDataChannel());
connectChannels(otherOp->getOutputPunctuationChannel(), scJoin->getInputPunctuationChannel());
//connect output of current scaleJoin instance with the combining queue operator
CREATE_LINK(scJoin, combine);
//add queue and scaleJoin instance to the vectors
scQueueVec.push_back(qu);
scJoinVec.push_back(scJoin);
}
//add all queues, scaleJoins and the combining queue to the dataflow
Dataflow::BaseOpList scQueueList(scQueueVec.begin(), scQueueVec.end());
dataflow->addPublisherList(scQueueList);
Dataflow::BaseOpList scJoinList(scJoinVec.begin(), scJoinVec.end());
dataflow->addPublisherList(scJoinList);
auto iter = dataflow->addPublisher(combine);
//return the pipe
return Pipe<Tout>(dataflow, iter, keyExtractor, timestampExtractor, partitioningState, numPartitions);
} catch (boost::bad_any_cast& e) {
throw TopologyException("No KeyExtractor defined for join.");
}
}
/*--------------------------- table operators -------------------------*/
/**
......
......@@ -21,9 +21,6 @@
#ifndef SHJoin_hpp_
#define SHJoin_hpp_
#include <boost/core/ignore_unused.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/lock_guard.hpp>
#include <boost/unordered/unordered_map.hpp>
#include "qop/BinaryTransform.hpp"
......@@ -82,12 +79,6 @@ namespace pfabric {
/// the join algorithm to be used for concatenating the input elements
typedef ElementJoinTraits< ElementJoinImpl > ElementJoin;
/// a mutex for protecting join processing from concurrent sources
typedef boost::mutex JoinMutex;
/// a scoped lock for the mutex
typedef boost::lock_guard< JoinMutex > Lock;
public:
......@@ -139,11 +130,10 @@ namespace pfabric {
* flag indicating whether the tuple is new or invalidated now
*/
void processLeftDataElement( const LeftInputStreamElement& left, const bool outdated ) {
Lock lock( mMtx );
// 1. insert the tuple in the corresponding hash table or remove it if outdated
auto keyval = mLKeyExtractor( left );
updateHashTable( mLTable, keyval, left, outdated, lock );
updateHashTable( mLTable, keyval, left, outdated);
// 2. find join partners in the other hash table
auto rightEqualElements = mRTable.equal_range(keyval);
......@@ -165,11 +155,10 @@ namespace pfabric {
* flag indicating whether the tuple is new or invalidated now
*/
void processRightDataElement( const RightInputStreamElement& right, const bool outdated ) {
Lock lock( mMtx );
// 1. insert the tuple in the corresponding hash table or remove it if outdated
auto keyval = mRKeyExtractor( right );
updateHashTable( mRTable, keyval, right, outdated, lock );
updateHashTable( mRTable, keyval, right, outdated);
// 2. find join partners in the other hash table
auto leftEqualElements = mLTable.equal_range( keyval );
......@@ -215,8 +204,7 @@ namespace pfabric {
typename StreamElement
>
static void updateHashTable( HashTable& hashTable, const key_t& key,
const StreamElement& newElement, const bool outdated, const Lock& lock ) {
boost::ignore_unused( lock );
const StreamElement& newElement, const bool outdated) {
if( !outdated ) {
hashTable.insert( { key, newElement });
......@@ -268,7 +256,6 @@ namespace pfabric {
JoinPredicateFunc mJoinPredicate; //< a pointer to the function implementing the join predicate
LKeyExtractorFunc mLKeyExtractor; //< hash function for the lhs stream
RKeyExtractorFunc mRKeyExtractor; //< hash function for the rhs stream
mutable JoinMutex mMtx; //< mutex for synchronizing access to the hash tables
};
} /* end namespace pfabric */
......
/*
* Copyright (c) 2014-17 The PipeFabric team,
* All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License (GPL) as
* published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; see the file LICENSE.
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#ifndef ScaleJoin_hpp_
#define ScaleJoin_hpp_
#include <unordered_map>
#include "qop/BinaryTransform.hpp"
#include "ElementJoinTraits.hpp"
#include "DefaultElementJoin.hpp"
namespace pfabric {
/**
* \brief An operator implementing a ScaleJoin.
* Origin idea & paper: "ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join" (2016)
*
* The ScaleJoin operator joins two input streams on a given join predicate. Each ScaleJoin instance gets all
* tuples from both streams, but stores only the tuples belonging to it's ID. Therefore each incoming tuple is
* only stored once in one of the ScaleJoin instances, reducing the overall memory usage.
*
* @tparam LeftInputStreamElement
* the data stream element type from the left source
* @tparam RightInputStreamElement
* the data stream element type from the right source
* @tparam ElementJoinImpl
* the actual join algorithm to be used for joining two input elements
*/
template<
typename LeftInputStreamElement,
typename RightInputStreamElement,
typename KeyType = DefaultKeyType,
typename ElementJoinImpl = DefaultElementJoin< LeftInputStreamElement, RightInputStreamElement >
>
class ScaleJoin : public BinaryTransform<LeftInputStreamElement, RightInputStreamElement,
typename ElementJoinTraits< ElementJoinImpl >::ResultElement>{
private:
PFABRIC_BINARY_TRANSFORM_TYPEDEFS(LeftInputStreamElement, RightInputStreamElement, typename ElementJoinTraits< ElementJoinImpl >::ResultElement);
public:
//typedef for the key extractor functions
typedef std::function<KeyType(const LeftInputStreamElement&)> LKeyExtractorFunc;
typedef std::function<KeyType(const RightInputStreamElement&)> RKeyExtractorFunc;
//typedef for the pointer to a function implementing the join predicate
typedef std::function< bool(const LeftInputStreamElement&, const RightInputStreamElement&) > JoinPredicateFunc;
private:
//the type definition for the hash tables - because of allowing stream elements to have the same key, a multimap is necessary
typedef std::unordered_multimap< KeyType, LeftInputStreamElement > LHashTable;
typedef std::unordered_multimap< KeyType, RightInputStreamElement > RHashTable;
//the join algorithm to be used for concatenating the input elements
typedef ElementJoinTraits< ElementJoinImpl > ElementJoin;
public:
//the join result for two input elements
typedef typename ElementJoin::ResultElement ResultElement;
/**
* Constructs a new ScaleJoin operator subscribing to two source operators producing the
* left and right hand-side input data streams.
*
* \param lhs_hash function pointer to the hash function for tuples of the lhs stream
* \param rhs_hash function pointer to the hash function for tuples of the rhs stream
* \param join_pred function pointer to a join predicate
* \param id unique id for the thread, used for deciding which tuples to store
* \param numThreads amount of ScaleJoin instances (threads)
*/
ScaleJoin(LKeyExtractorFunc lKeyFunc, RKeyExtractorFunc rKeyFunc, JoinPredicateFunc joinPred, const int id, const int numThreads) :
mJoinPredicate(joinPred), mLKeyExtractor(lKeyFunc), mRKeyExtractor(rKeyFunc), mID(id), mThreadnum(numThreads) {
//initialize counters to zero
mLCntr = mRCntr = mLOCntr = mROCntr = 0;
}
//bind the callback for the left handside data channel
BIND_INPUT_CHANNEL_DEFAULT(LeftInputChannel, ScaleJoin, processLeftDataElement);
//bind the callback for the right data channel
BIND_INPUT_CHANNEL_DEFAULT(RightInputChannel, ScaleJoin, processRightDataElement);
//bind the callback for the punctuation channel
BIND_INPUT_CHANNEL_DEFAULT(InputPunctuationChannel, ScaleJoin, processPunctuation);
private:
/**
* @brief This method is invoked when a data stream element arrives from the left input channel.
*
* The element is inserted into the corresponding hash table if (according to the ID) this ScaleJoin
* instance is responsible for storing the tuple. However, it always tries to join it with elements
* from the other hash table.
*
* @param[in] left
* the incoming stream element from the left input channel
* @param[in] outdated
* flag indicating whether the tuple is new or invalidated now
*/
void processLeftDataElement(const LeftInputStreamElement& left, const bool outdated) {
//extract the key from the tuple
auto keyval = mLKeyExtractor(left);
//if tuple is outdated
if(outdated) {
//if left outdated counter matches own ID, this instance is responsible for storing the tuple,
//therefore it should have seen it before, so it can be removed from the hash table
if(mLOCntr==mID) {
updateHashTable(mLTable, keyval, left, outdated);
}
//increase counter for left outdated tuples and verify that the counter is not higher than the
//maximum number of ScaleJoin instances (round robin principle)
mLOCntr++;
mLOCntr%=mThreadnum;
//if tuple is not outdated
} else {
//if left tuple counter matches own ID, this instance is responsible for storing the tuple,
//so it is inserted into the left hash table
if(mLCntr==mID) {
updateHashTable(mLTable, keyval, left, outdated);
}
//increase counter for left tuples and verify that the counter is not higher than the
//maximum number of ScaleJoin instances (round robin principle)
mLCntr++;
mLCntr%=mThreadnum;
}
//try to find match in the right hash table
auto rightEqualElements = mRTable.equal_range(keyval);
//for all matching tuples, join them
for (auto rightElementEntry = rightEqualElements.first; rightElementEntry != rightEqualElements.second; rightElementEntry++) {
joinTuples(left, rightElementEntry->second, outdated);
}
}
/**
* @brief This method is invoked when a data stream element arrives from the right input channel.
*
* The element is inserted into the corresponding hash table if (according to the ID) this ScaleJoin
* instance is responsible for storing the tuple. However, it always tries to join it with elements
* from the other hash table.
*
* @param[in] right
* the incoming stream element from the right input channel
* @param[in] outdated
* flag indicating whether the tuple is new or invalidated now
*/
void processRightDataElement(const RightInputStreamElement& right, const bool outdated) {
//extract the key from the tuple
auto keyval = mRKeyExtractor(right);
//if tuple is outdated
if(outdated) {
//if right outdated counter matches own ID, this instance is responsible for storing the tuple,
//therefore it should have seen it before, so it can be removed from the hash table
if(mROCntr==mID) {
updateHashTable(mRTable, keyval, right, outdated);
}
//increase counter for right outdated tuples and verify that the counter is not higher than the
//maximum number of ScaleJoin instances (round robin principle)
mROCntr++;
mROCntr%=mThreadnum;
//if tuple is not outdated
} else {
//if right tuple counter matches own ID, this instance is responsible for storing the tuple,
//so it is inserted into the right hash table
if(mRCntr==mID) {
updateHashTable(mRTable, keyval, right, outdated);
}
//increase counter for right tuples and verify that the counter is not higher than the
//maximum number of ScaleJoin instances (round robin principle)
mRCntr++;
mRCntr%=mThreadnum;
}
//try to find match in the left hash table
auto leftEqualElements = mLTable.equal_range(keyval);
//for all matching tuples, join them
for (auto leftElementEntry = leftEqualElements.first; leftElementEntry != leftEqualElements.second; leftElementEntry++) {
joinTuples(leftElementEntry->second, right, outdated);
}
}
/**
* @brief This method is invoked when a punctuation arrives.
*
* It simply forwards the punctuation to the subscribers.
*
* @param[in] punctuation
* the incoming punctuation tuple
*/
void processPunctuation(const PunctuationPtr& punctuation) {
this->getOutputPunctuationChannel().publish(punctuation);
}
/**
* @brief Update a hash table for a new input element.
*
* @tparam HashTable
* the type of the hash table to be updated
* @param[in] hashTable
* reference to the hash table to be updated
* @param[in] key
* the hash key for the new element
* @param[in] newElement
* the new element
* @param[in] outdated
* flag indicating whether the tuple is new or invalidated now
*/
template<typename HashTable, typename StreamElement>
static void updateHashTable(HashTable& hashTable, const key_t& key,
const StreamElement& newElement, const bool outdated) {
//if not outdated, just insert it into the hash table
if(!outdated) {
hashTable.insert({key, newElement});
//if outdated
} else {
//get all tuples with the same key
auto equalElements = hashTable.equal_range(key);
//pointer to first element
auto equalElementEntry = equalElements.first;
//run through all tuples, remove matching tuples from hash table
while (equalElementEntry != equalElements.second) {
const auto& equalElement = equalElementEntry->second;
if(elementsEqual(newElement, equalElement)) {
equalElementEntry = hashTable.erase(equalElementEntry);
} else {
equalElementEntry++;
}
}//end while
}//end outdated
}
/**
* @brief Join two tuples and publish the result.
*
* This method joins two input tuples and produces a result if the join predicate matches.
*
* @param[in] left
* the tuple from the left handside of the join
* @param[in] right
* the tuple from the right handside of the join
* @param[in] outdated
* flag indicating whether the tuple is new or invalidated now
*/
void joinTuples(const LeftInputStreamElement& left, const RightInputStreamElement& right,
const bool outdated) {
//if join predicate matches
if(mJoinPredicate(left, right)) {
//execute join
ResultElement joinedTuple = ElementJoin::joinElements(left, right);
//publish to following operator
this->getOutputDataChannel().publish(joinedTuple, outdated);
}
}
LHashTable mLTable; //hash table for the left stream
RHashTable mRTable; //hash table for the right stream
JoinPredicateFunc mJoinPredicate; //pointer to the function implementing the join predicate
LKeyExtractorFunc mLKeyExtractor; //function for extracting the key of the left stream
RKeyExtractorFunc mRKeyExtractor; //function for extracting the key of the right stream
const int mID; //unique ID of this ScaleJoin instance
const int mThreadnum; //number of all ScaleJoin instances
short mLCntr, mRCntr, mLOCntr, mROCntr; //counters for left and right stream tuples (+outdated)
};
}
#endif
......@@ -19,6 +19,33 @@
using namespace pfabric;
using namespace ns_types;
TEST_CASE("Building and running a topology with ScaleJoin (3 instances)", "[ScaleJoin]") {
typedef TuplePtr<int, std::string, double> tPtr;
unsigned short num = 100;
TestDataGenerator tgen1("file.csv");
tgen1.writeData(num);
unsigned int results = 0;
Topology t;
auto s1 = t.newStreamFromMemory<tPtr>("file.csv")
.keyBy<0>();
auto s2 = t.newStreamFromMemory<tPtr>("file.csv")
.keyBy<0>()
.scaleJoin(s1, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); }, 3)
.notify([&](auto tp, bool outdated) { results++; });
t.prepare();
t.start(false);
while(results!=num) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
REQUIRE(results == num);
}
TEST_CASE("Building and running a topology with joins", "[Unpartitioned Join]") {
typedef TuplePtr<int, std::string, double> T1;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment