Commit 3f4e53b3 authored by Philipp Götze's avatar Philipp Götze
Browse files

Merge branch 'master' into wip/nvml

parents 0c8e1607 a5b588ee
......@@ -326,11 +326,12 @@ auto s = t.newStreamFromFile("data.csv")
#### slidingWindow ####
`Pipe<T> Pipe::slidingWindow(WindowParams::WinType wt, unsigned int sz, unsigned int ei)`
`Pipe<T> Pipe::slidingWindow(WindowParams::WinType wt, unsigned int sz, WindowOpFunc windowFunc, unsigned int ei)`
This operator defines a sliding window of the given type and size on the stream. The window type `wt` can be
row (count-based) or range (time-based) for which `sz` specifies the size (in tuples or milliseconds).
In case of a range (time-based) window the `assignTimestamps` operator has to defined before on the stream.
row (count-based) or range (time-based) for which `sz` specifies the size (in tuples or milliseconds).
In case of a range (time-based) window the `assignTimestamps` operator has to defined before on the stream.
The optional `windowFunc` parameter can be used with a lambda function to modify each incoming tuple of the window.
The optional parameter `ei` denotes the eviction interval, i.e. the time interval (in milliseconds) for
for triggering the eviction of tuples from the window.
In the following example a time-based sliding window of 60 seconds is created. The timestamp of the tuples is
......@@ -347,10 +348,11 @@ auto s = s.createStreamFromFile()
#### tumblingWindow ####
`Pipe<T> Pipe::tumblingWindow(WindowParams::WinType wt, unsigned int sz)`
`Pipe<T> Pipe::tumblingWindow(WindowParams::WinType wt, unsigned int sz, WindowOpFunc windowFunc)`
The `tumblingWindow` operator creates a row or range-based tumbling window of the given size `sz`. In
contrast to a sliding window a tumbling window invalidates all tuples as soon as the window is completely
The `tumblingWindow` operator creates a row or range-based tumbling window of the given size `sz`.
The optional `windowFunc` parameter can be used with a lambda function to modify each incoming tuple of the window.
In contrast to a sliding window a tumbling window invalidates all tuples as soon as the window is completely
filled - either by its size (row) or time difference of the oldest and most recent tuple. As in
`slidingWindow` a range-based window requires to specify the timestamp column with `assignTimestamps`.
The following example code creates tumbling window that outdates the tuples after every 100 processed
......
......@@ -37,6 +37,12 @@ PFabricContext::TopologyPtr PFabricContext::createTopology() {
return std::make_shared<Topology>();
}
bool PFabricContext::tableExists(const std::string& tblName) const {
// look for the table
auto it = mTableSet.find(tblName);
return (it != mTableSet.end());
}
TableInfoPtr PFabricContext::getTableInfo(const std::string& tblName) {
// look for the table
auto it = mTableSet.find(tblName);
......
......@@ -149,6 +149,8 @@ public:
return std::shared_ptr<Table<RecordType, KeyType>>();
}
bool tableExists(const std::string& tblName) const;
TableInfoPtr getTableInfo(const std::string& tblName);
/**
......
......@@ -371,14 +371,17 @@ class Pipe {
* the type of the window (row or range)
* @param[in] sz
* the window size (in number of tuples for row window or in milliseconds
* for range
* for range
* windows)
* @param[in] windowFunc
* optional function applied on each incoming tuple
* @param[in] ei
* the eviction interval, i.e., time for triggering the eviction (in
* milliseconds)
* milliseconds)
* @return a new pipe
*/
Pipe<T> slidingWindow(const WindowParams::WinType& wt, const unsigned int sz,
typename Window<T>::WindowOpFunc windowFunc = nullptr,
const unsigned int ei = 0) throw(TableException) {
typedef typename Window<T>::TimestampExtractorFunc ExtractorFunc;
ExtractorFunc fn;
......@@ -392,9 +395,9 @@ class Pipe {
if (wt == WindowParams::RangeWindow) {
// a range window requires a timestamp extractor
fn = boost::any_cast<ExtractorFunc>(timestampExtractor);
op = std::make_shared<SlidingWindow<T>>(fn, wt, sz, ei);
op = std::make_shared<SlidingWindow<T>>(fn, wt, sz, windowFunc, ei);
} else
op = std::make_shared<SlidingWindow<T>>(wt, sz, ei);
op = std::make_shared<SlidingWindow<T>>(wt, sz, windowFunc, ei);
auto iter = addPublisher<SlidingWindow<T>, DataSource<T>>(op);
return Pipe<T>(dataflow, iter, keyExtractor, timestampExtractor,
partitioningState, numPartitions);
......@@ -404,11 +407,11 @@ class Pipe {
// a range window requires a timestamp extractor
fn = boost::any_cast<ExtractorFunc>(timestampExtractor);
for (auto i = 0u; i < numPartitions; i++) {
ops.push_back(std::make_shared<SlidingWindow<T>>(fn, wt, sz, ei));
ops.push_back(std::make_shared<SlidingWindow<T>>(fn, wt, sz, windowFunc, ei));
}
} else {
for (auto i = 0u; i < numPartitions; i++) {
ops.push_back(std::make_shared<SlidingWindow<T>>(wt, sz, ei));
ops.push_back(std::make_shared<SlidingWindow<T>>(wt, sz, windowFunc, ei));
}
}
auto iter = addPartitionedPublisher<SlidingWindow<T>, T>(ops);
......@@ -433,13 +436,14 @@ class Pipe {
* the type of the window (row or range)
* @param[in] sz
* the window size (in number of tuples for row window or in
* milliseconds
* for range
* windows)
* milliseconds for range windows)
* @param[in] windowFunc
* optional function applied on each incoming tuple
* @return a new pipe
*/
Pipe<T> tumblingWindow(const WindowParams::WinType& wt,
const unsigned int sz) throw(TableException) {
const unsigned int sz,
typename Window<T>::WindowOpFunc windowFunc = nullptr) throw(TableException) {
typedef typename Window<T>::TimestampExtractorFunc ExtractorFunc;
ExtractorFunc fn;
......@@ -450,9 +454,9 @@ class Pipe {
if (wt == WindowParams::RangeWindow) {
// a range window requires a timestamp extractor
fn = boost::any_cast<ExtractorFunc>(timestampExtractor);
op = std::make_shared<TumblingWindow<T>>(fn, wt, sz);
op = std::make_shared<TumblingWindow<T>>(fn, wt, sz, windowFunc);
} else
op = std::make_shared<TumblingWindow<T>>(wt, sz);
op = std::make_shared<TumblingWindow<T>>(wt, sz, windowFunc);
auto iter = addPublisher<TumblingWindow<T>, DataSource<T>>(op);
return Pipe<T>(dataflow, iter, keyExtractor, timestampExtractor,
partitioningState, numPartitions);
......@@ -462,11 +466,11 @@ class Pipe {
// a range window requires a timestamp extractor
fn = boost::any_cast<ExtractorFunc>(timestampExtractor);
for (auto i = 0u; i < numPartitions; i++) {
ops.push_back(std::make_shared<TumblingWindow<T>>(fn, wt, sz));
ops.push_back(std::make_shared<TumblingWindow<T>>(fn, wt, sz, windowFunc));
}
} else {
for (auto i = 0u; i < numPartitions; i++) {
ops.push_back(std::make_shared<TumblingWindow<T>>(wt, sz));
ops.push_back(std::make_shared<TumblingWindow<T>>(wt, sz, windowFunc));
}
}
auto iter = addPartitionedPublisher<TumblingWindow<T>, T>(ops);
......
......@@ -65,12 +65,15 @@ namespace pfabric {
* @param func a function for extracting the timestamp value from the stream element
* @param wt the type of the window (range or row)
* @param sz the window size (seconds or number of tuples)
* @param windowFunc optional function for modifying incoming tuples
* @param ei ei the eviction interval, i.e., time for triggering the eviction (in milliseconds)
*/
SlidingWindow(typename Window<StreamElement>::TimestampExtractorFunc func,
const WindowParams::WinType& wt,
const unsigned int sz, const unsigned int ei = 0) :
WindowBase(func, wt, sz, ei ) {
const unsigned int sz,
typename Window<StreamElement>::WindowOpFunc windowFunc = nullptr,
const unsigned int ei = 0) :
WindowBase(func, wt, sz, windowFunc, ei ) {
if (ei == 0) {
// sliding window where the incoming tuple evicts outdated tuples
this->mEvictFun = std::bind( this->mWinType == WindowParams::RangeWindow ?
......@@ -92,11 +95,14 @@ namespace pfabric {
*
* @param wt the type of the window (range or row)
* @param sz the window size (seconds or number of tuples)
* @param windowFunc optional function for modifying incoming tuples
* @param ei ei the eviction interval, i.e., time for triggering the eviction (in milliseconds)
*/
SlidingWindow(const WindowParams::WinType& wt,
const unsigned int sz, const unsigned int ei = 0) :
WindowBase(wt, sz, ei ) {
const unsigned int sz,
typename Window<StreamElement>::WindowOpFunc windowFunc = nullptr,
const unsigned int ei = 0) :
WindowBase(wt, sz, windowFunc, ei ) {
if (ei == 0) {
// sliding window where the incoming tuple evicts outdated tuples
this->mEvictFun = std::bind( this->mWinType == WindowParams::RangeWindow ?
......@@ -147,22 +153,43 @@ namespace pfabric {
if( outdated == true ) {
// not sure if this is really necessary
this->getOutputDataChannel().publish(data, outdated);
}
else {
} else {
// if function available
if(this->mWindowOpFunc != nullptr) {
// insert the tuple into buffer
{
std::lock_guard<std::mutex> guard(this->mMtx);
this->mTupleBuf.push_back(data);
this->mCurrSize++;
}
{ //necessary for lock scope!
std::lock_guard<std::mutex> guard(this->mMtx);
this->mTupleBuf.push_back(data);
this->mCurrSize++;
}
// check for outdated tuples
if (!this->mEvictThread) {
this->mEvictFun();
}
// check for outdated tuples
if (!this->mEvictThread) {
this->mEvictFun();
}
// apply the window function - we do this after the window was updated
auto res = this->mWindowOpFunc(this->mTupleBuf.begin(),
this->mTupleBuf.end(), data);
// finally, forward the incoming tuple
this->getOutputDataChannel().publish(res, outdated);
} else {
// insert the tuple into buffer
{ //necessary for lock scope!
std::lock_guard<std::mutex> guard(this->mMtx);
this->mTupleBuf.push_back(data);
this->mCurrSize++;
}
// finally, forward the incoming tuple
this->getOutputDataChannel().publish(data, outdated);
// check for outdated tuples
if (!this->mEvictThread) {
this->mEvictFun();
}
// finally, forward the incoming tuple
this->getOutputDataChannel().publish(data, outdated);
}
}
}
......
......@@ -55,10 +55,12 @@ namespace pfabric {
* @param func a function for extracting the timestamp value from the stream element
* @param wt the type of the window (range or row)
* @param sz the window size (seconds or number of tuples)
* @param windowFunc optional function for modifying incoming tuples
*/
TumblingWindow(typename Window<StreamElement>::TimestampExtractorFunc func,
const WindowParams::WinType& wt, const unsigned int sz ) :
WindowBase(func, wt, sz, sz ) {
const WindowParams::WinType& wt, const unsigned int sz,
typename Window<StreamElement>::WindowOpFunc windowFunc = nullptr) :
WindowBase(func, wt, sz, windowFunc ) {
if( this->mWinType == WindowParams::RowWindow ) {
this->mEvictFun = std::bind(&TumblingWindow::evictByCount, this);
}
......@@ -72,9 +74,11 @@ namespace pfabric {
*
* @param wt the type of the window (range or row)
* @param sz the window size (seconds or number of tuples)
* @param windowFunc optional function for modifying incoming tuples
*/
TumblingWindow(const WindowParams::WinType& wt, const unsigned int sz ) :
WindowBase(wt, sz, sz ) {
TumblingWindow(const WindowParams::WinType& wt, const unsigned int sz,
typename Window<StreamElement>::WindowOpFunc windowFunc = nullptr) :
WindowBase(wt, sz, windowFunc ) {
if( this->mWinType == WindowParams::RowWindow ) {
this->mEvictFun = std::bind(&TumblingWindow::evictByCount, this);
}
......@@ -126,19 +130,42 @@ namespace pfabric {
return;
}
else {
// insert the tuple into buffer
{
std::lock_guard<std::mutex> guard(this->mMtx);
this->mTupleBuf.push_back(data);
this->mCurrSize++;
}
// if function available
if(this->mWindowOpFunc != nullptr) {
// insert the tuple into buffer
{ //necessary for lock scope!
std::lock_guard<std::mutex> guard(this->mMtx);
this->mTupleBuf.push_back(data);
this->mCurrSize++;
}
// apply function on tuple
auto res = this->mWindowOpFunc(this->mTupleBuf.begin(),
this->mTupleBuf.end(), data);
//forward the incoming tuple
this->getOutputDataChannel().publish(data, outdated);
this->getOutputDataChannel().publish(res, outdated);
// check for outdated tuples
if (!this->mEvictThread) {
this->mEvictFun();
}
} else {
// insert the tuple into buffer
{ //necessary for lock scope!
std::lock_guard<std::mutex> guard(this->mMtx);
this->mTupleBuf.push_back(data);
this->mCurrSize++;
}
// check for outdated tuples
if (!this->mEvictThread) {
this->mEvictFun();
//forward the incoming tuple
this->getOutputDataChannel().publish(data, outdated);
// check for outdated tuples
if (!this->mEvictThread) {
this->mEvictFun();
}
}
}
}
......
......@@ -71,8 +71,18 @@ namespace pfabric {
public UnaryTransform< StreamElement, StreamElement > // use default unary transform
{
public:
typedef typename std::list<StreamElement>::const_iterator ElementIterator;
typedef std::function<Timestamp(const StreamElement&)> TimestampExtractorFunc;
/**
* An optional function that can be applied to the entire window
* when a new tuple arrives.
*/
typedef std::function<StreamElement(ElementIterator beg,
ElementIterator end,
const StreamElement&)> WindowOpFunc;
protected:
PFABRIC_UNARY_TRANSFORM_TYPEDEFS(StreamElement, StreamElement);
......@@ -82,11 +92,12 @@ namespace pfabric {
* @param func a function for extracting the timestamp value from the stream element
* @param wt the type of the window (range or row)
* @param sz the window size (seconds or number of tuples)
* @param winOpFunc optional function for modifying incoming tuples
* @param ei the eviction interval, i.e., time for triggering the eviction (in milliseconds)
*/
Window(TimestampExtractorFunc func, const WindowParams::WinType& wt,
const unsigned int sz, const unsigned int ei = 0) :
mTimestampExtractor(func), mWinType(wt), mWinSize(sz), mEvictInterval(ei), mCurrSize(0) {
const unsigned int sz, WindowOpFunc winOpFunc = nullptr, const unsigned int ei = 0) :
mTimestampExtractor(func), mWinType(wt), mWinSize(sz), mWindowOpFunc(winOpFunc), mEvictInterval(ei), mCurrSize(0) {
mDiffTime = (mWinType == WindowParams::RangeWindow ?
boost::posix_time::seconds(mWinSize).total_microseconds() : 0
);
......@@ -100,11 +111,12 @@ namespace pfabric {
*
* @param wt the type of the window (range or row)
* @param sz the window size (seconds or number of tuples)
* @param winOpFunc optional function for modifying incoming tuples
* @param ei ei the eviction interval, i.e., time for triggering the eviction (in milliseconds)
*/
Window(const WindowParams::WinType& wt,
const unsigned int sz, const unsigned int ei = 0) :
mWinType(wt), mWinSize(sz), mEvictInterval(ei), mCurrSize(0), mDiffTime(0) {
const unsigned int sz, WindowOpFunc winOpFunc = nullptr, const unsigned int ei = 0) :
mWinType(wt), mWinSize(sz), mWindowOpFunc(winOpFunc), mEvictInterval(ei), mCurrSize(0), mDiffTime(0) {
BOOST_ASSERT_MSG(mWinType == WindowParams::RowWindow, "RowWindow requires timestamp extractor function.");
}
......@@ -116,6 +128,7 @@ namespace pfabric {
TimestampExtractorFunc mTimestampExtractor; //< a function for extracting timestamps from a tuple
WindowParams::WinType mWinType; //< the type of window
unsigned int mWinSize; //< the size of window (time or number of tuples)
WindowOpFunc mWindowOpFunc; //< function for modifying incoming tuples
unsigned int mEvictInterval; //< the slide length of window (time or number of tuples)
TupleList mTupleBuf; //< the actual window buffer
unsigned int mCurrSize; //< the current number of tuples in the window
......
......@@ -340,7 +340,7 @@ TEST_CASE("Tuplifying a stream of RDF strings", "[Tuplifier]") {
Topology t;
auto s = t.newStreamFromFile(std::string(TEST_DATA_DIRECTORY) + "tuplifier_test1.in")
.extract<Triple>(',')
.tuplify<RDFTuple>({ "http://data.org/name", "http://data.org/price", "http://data.org/someOther" },
.tuplify<RDFTuple>({ "http://data.org/name", "http://data.org/price", "http://data.org/someOther" },
TuplifierParams::ORDERED)
.notify([&](auto tp, bool outdated) {
std::lock_guard<std::mutex> lock(r_mutex);
......@@ -349,3 +349,64 @@ TEST_CASE("Tuplifying a stream of RDF strings", "[Tuplifier]") {
t.start(false);
REQUIRE(results.size() == 3);
}
TEST_CASE("Using a window with and without additional function", "[Window]") {
typedef TuplePtr<int, std::string, double> T1;
typedef TuplePtr<int> T2;
typedef Aggregator1<T1, AggrSum<double>, 2> AggrStateSum;
TestDataGenerator tgen("file.csv");
tgen.writeData(10);
std::stringstream strm;
std::string expected = "0.5\n101\n301.5\n601.5\n901.5\n1201.5\n1501.5\n1801.5\n2101.5\n2401.5\n";
Topology t1;
auto s1 = t1.newStreamFromFile("file.csv")
.extract<T1>(',')
.slidingWindow(WindowParams::RowWindow, 3)
.aggregate<AggrStateSum>()
.print(strm);
t1.start(false);
REQUIRE(strm.str() == expected);
std::stringstream strm2;
expected = "1.5\n103\n304.5\n604.5\n904.5\n1204.5\n1504.5\n1804.5\n2104.5\n2404.5\n";
//just increment incoming tuples double-attribute by one
auto winFunc = [](auto beg, auto end, auto tp) { get<2>(tp)++; return tp; };
Topology t2;
auto s2 = t2.newStreamFromFile("file.csv")
.extract<T1>(',')
.slidingWindow(WindowParams::RowWindow, 3, winFunc)
.aggregate<AggrStateSum>()
.print(strm2);
t2.start(false);
REQUIRE(strm2.str() == expected);
std::stringstream strm3;
expected = "0\n1\n1\n2\n2\n3\n4\n5\n6\n7\n";
//find median of ints
auto winFuncMedian = [](auto beg, auto end, auto tp) {
std::vector<int> winInts(0);
for(auto it=beg; it!=end; ++it) {
winInts.push_back(get<0>(*it));
}
std::sort(winInts.begin(), winInts.end());
return makeTuplePtr(winInts[winInts.size()/2]);
};
Topology t3;
auto s3 = t3.newStreamFromFile("file.csv")
.extract<T1>(',')
.map<T2>([](auto tp, bool outdated) -> T2 { return makeTuplePtr(get<0>(tp)); })
.slidingWindow(WindowParams::RowWindow, 5, winFuncMedian)
.print(strm3);
t3.start(false);
REQUIRE(strm3.str() == expected);
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment