Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
code
pfabric
Commits
96c97fb7
Commit
96c97fb7
authored
Aug 09, 2017
by
Constantin Pohl
Browse files
Improved Window-Operators with optional function & test case
parent
7f55dec4
Changes
6
Hide whitespace changes
Inline
Side-by-side
documentation/Operators.md
View file @
96c97fb7
...
...
@@ -326,11 +326,12 @@ auto s = t.newStreamFromFile("data.csv")
#### slidingWindow ####
`Pipe<T> Pipe::slidingWindow(WindowParams::WinType wt, unsigned int sz, unsigned int ei)`
`Pipe<T> Pipe::slidingWindow(WindowParams::WinType wt, unsigned int sz,
WindowOpFunc windowFunc,
unsigned int ei)`
This operator defines a sliding window of the given type and size on the stream. The window type
`wt`
can be
row (count-based) or range (time-based) for which
`sz`
specifies the size (in tuples or milliseconds).
In case of a range (time-based) window the
`assignTimestamps`
operator has to defined before on the stream.
row (count-based) or range (time-based) for which
`sz`
specifies the size (in tuples or milliseconds).
In case of a range (time-based) window the
`assignTimestamps`
operator has to defined before on the stream.
The optional
`windowFunc`
parameter can be used with a lambda function to modify each incoming tuple of the window.
The optional parameter
`ei`
denotes the eviction interval, i.e. the time interval (in milliseconds) for
for triggering the eviction of tuples from the window.
In the following example a time-based sliding window of 60 seconds is created. The timestamp of the tuples is
...
...
@@ -347,10 +348,11 @@ auto s = s.createStreamFromFile()
#### tumblingWindow ####
`Pipe<T> Pipe::tumblingWindow(WindowParams::WinType wt, unsigned int sz)`
`Pipe<T> Pipe::tumblingWindow(WindowParams::WinType wt, unsigned int sz
, WindowOpFunc windowFunc
)`
The
`tumblingWindow`
operator creates a row or range-based tumbling window of the given size
`sz`
. In
contrast to a sliding window a tumbling window invalidates all tuples as soon as the window is completely
The
`tumblingWindow`
operator creates a row or range-based tumbling window of the given size
`sz`
.
The optional
`windowFunc`
parameter can be used with a lambda function to modify each incoming tuple of the window.
In contrast to a sliding window a tumbling window invalidates all tuples as soon as the window is completely
filled - either by its size (row) or time difference of the oldest and most recent tuple. As in
`slidingWindow`
a range-based window requires to specify the timestamp column with
`assignTimestamps`
.
The following example code creates tumbling window that outdates the tuples after every 100 processed
...
...
src/dsl/Pipe.hpp
View file @
96c97fb7
...
...
@@ -371,14 +371,17 @@ class Pipe {
* the type of the window (row or range)
* @param[in] sz
* the window size (in number of tuples for row window or in milliseconds
* for range
*
for range
* windows)
* @param[in] windowFunc
* optional function applied on each incoming tuple
* @param[in] ei
* the eviction interval, i.e., time for triggering the eviction (in
* milliseconds)
*
milliseconds)
* @return a new pipe
*/
Pipe
<
T
>
slidingWindow
(
const
WindowParams
::
WinType
&
wt
,
const
unsigned
int
sz
,
typename
Window
<
T
>::
WindowOpFunc
windowFunc
=
nullptr
,
const
unsigned
int
ei
=
0
)
throw
(
TableException
)
{
typedef
typename
Window
<
T
>::
TimestampExtractorFunc
ExtractorFunc
;
ExtractorFunc
fn
;
...
...
@@ -392,9 +395,9 @@ class Pipe {
if
(
wt
==
WindowParams
::
RangeWindow
)
{
// a range window requires a timestamp extractor
fn
=
boost
::
any_cast
<
ExtractorFunc
>
(
timestampExtractor
);
op
=
std
::
make_shared
<
SlidingWindow
<
T
>>
(
fn
,
wt
,
sz
,
ei
);
op
=
std
::
make_shared
<
SlidingWindow
<
T
>>
(
fn
,
wt
,
sz
,
windowFunc
,
ei
);
}
else
op
=
std
::
make_shared
<
SlidingWindow
<
T
>>
(
wt
,
sz
,
ei
);
op
=
std
::
make_shared
<
SlidingWindow
<
T
>>
(
wt
,
sz
,
windowFunc
,
ei
);
auto
iter
=
addPublisher
<
SlidingWindow
<
T
>
,
DataSource
<
T
>>
(
op
);
return
Pipe
<
T
>
(
dataflow
,
iter
,
keyExtractor
,
timestampExtractor
,
partitioningState
,
numPartitions
);
...
...
@@ -404,11 +407,11 @@ class Pipe {
// a range window requires a timestamp extractor
fn
=
boost
::
any_cast
<
ExtractorFunc
>
(
timestampExtractor
);
for
(
auto
i
=
0u
;
i
<
numPartitions
;
i
++
)
{
ops
.
push_back
(
std
::
make_shared
<
SlidingWindow
<
T
>>
(
fn
,
wt
,
sz
,
ei
));
ops
.
push_back
(
std
::
make_shared
<
SlidingWindow
<
T
>>
(
fn
,
wt
,
sz
,
windowFunc
,
ei
));
}
}
else
{
for
(
auto
i
=
0u
;
i
<
numPartitions
;
i
++
)
{
ops
.
push_back
(
std
::
make_shared
<
SlidingWindow
<
T
>>
(
wt
,
sz
,
ei
));
ops
.
push_back
(
std
::
make_shared
<
SlidingWindow
<
T
>>
(
wt
,
sz
,
windowFunc
,
ei
));
}
}
auto
iter
=
addPartitionedPublisher
<
SlidingWindow
<
T
>
,
T
>
(
ops
);
...
...
@@ -433,13 +436,14 @@ class Pipe {
* the type of the window (row or range)
* @param[in] sz
* the window size (in number of tuples for row window or in
* milliseconds
*
for range
*
windows)
*
milliseconds
for range windows)
*
@param[in] windowFunc
*
optional function applied on each incoming tuple
* @return a new pipe
*/
Pipe
<
T
>
tumblingWindow
(
const
WindowParams
::
WinType
&
wt
,
const
unsigned
int
sz
)
throw
(
TableException
)
{
const
unsigned
int
sz
,
typename
Window
<
T
>::
WindowOpFunc
windowFunc
=
nullptr
)
throw
(
TableException
)
{
typedef
typename
Window
<
T
>::
TimestampExtractorFunc
ExtractorFunc
;
ExtractorFunc
fn
;
...
...
@@ -450,9 +454,9 @@ class Pipe {
if
(
wt
==
WindowParams
::
RangeWindow
)
{
// a range window requires a timestamp extractor
fn
=
boost
::
any_cast
<
ExtractorFunc
>
(
timestampExtractor
);
op
=
std
::
make_shared
<
TumblingWindow
<
T
>>
(
fn
,
wt
,
sz
);
op
=
std
::
make_shared
<
TumblingWindow
<
T
>>
(
fn
,
wt
,
sz
,
windowFunc
);
}
else
op
=
std
::
make_shared
<
TumblingWindow
<
T
>>
(
wt
,
sz
);
op
=
std
::
make_shared
<
TumblingWindow
<
T
>>
(
wt
,
sz
,
windowFunc
);
auto
iter
=
addPublisher
<
TumblingWindow
<
T
>
,
DataSource
<
T
>>
(
op
);
return
Pipe
<
T
>
(
dataflow
,
iter
,
keyExtractor
,
timestampExtractor
,
partitioningState
,
numPartitions
);
...
...
@@ -462,11 +466,11 @@ class Pipe {
// a range window requires a timestamp extractor
fn
=
boost
::
any_cast
<
ExtractorFunc
>
(
timestampExtractor
);
for
(
auto
i
=
0u
;
i
<
numPartitions
;
i
++
)
{
ops
.
push_back
(
std
::
make_shared
<
TumblingWindow
<
T
>>
(
fn
,
wt
,
sz
));
ops
.
push_back
(
std
::
make_shared
<
TumblingWindow
<
T
>>
(
fn
,
wt
,
sz
,
windowFunc
));
}
}
else
{
for
(
auto
i
=
0u
;
i
<
numPartitions
;
i
++
)
{
ops
.
push_back
(
std
::
make_shared
<
TumblingWindow
<
T
>>
(
wt
,
sz
));
ops
.
push_back
(
std
::
make_shared
<
TumblingWindow
<
T
>>
(
wt
,
sz
,
windowFunc
));
}
}
auto
iter
=
addPartitionedPublisher
<
TumblingWindow
<
T
>
,
T
>
(
ops
);
...
...
src/qop/SlidingWindow.hpp
View file @
96c97fb7
...
...
@@ -65,12 +65,15 @@ namespace pfabric {
* @param func a function for extracting the timestamp value from the stream element
* @param wt the type of the window (range or row)
* @param sz the window size (seconds or number of tuples)
* @param windowFunc optional function for modifying incoming tuples
* @param ei ei the eviction interval, i.e., time for triggering the eviction (in milliseconds)
*/
SlidingWindow
(
typename
Window
<
StreamElement
>::
TimestampExtractorFunc
func
,
const
WindowParams
::
WinType
&
wt
,
const
unsigned
int
sz
,
const
unsigned
int
ei
=
0
)
:
WindowBase
(
func
,
wt
,
sz
,
ei
)
{
const
unsigned
int
sz
,
typename
Window
<
StreamElement
>::
WindowOpFunc
windowFunc
=
nullptr
,
const
unsigned
int
ei
=
0
)
:
WindowBase
(
func
,
wt
,
sz
,
windowFunc
,
ei
)
{
if
(
ei
==
0
)
{
// sliding window where the incoming tuple evicts outdated tuples
this
->
mEvictFun
=
std
::
bind
(
this
->
mWinType
==
WindowParams
::
RangeWindow
?
...
...
@@ -92,11 +95,14 @@ namespace pfabric {
*
* @param wt the type of the window (range or row)
* @param sz the window size (seconds or number of tuples)
* @param windowFunc optional function for modifying incoming tuples
* @param ei ei the eviction interval, i.e., time for triggering the eviction (in milliseconds)
*/
SlidingWindow
(
const
WindowParams
::
WinType
&
wt
,
const
unsigned
int
sz
,
const
unsigned
int
ei
=
0
)
:
WindowBase
(
wt
,
sz
,
ei
)
{
const
unsigned
int
sz
,
typename
Window
<
StreamElement
>::
WindowOpFunc
windowFunc
=
nullptr
,
const
unsigned
int
ei
=
0
)
:
WindowBase
(
wt
,
sz
,
windowFunc
,
ei
)
{
if
(
ei
==
0
)
{
// sliding window where the incoming tuple evicts outdated tuples
this
->
mEvictFun
=
std
::
bind
(
this
->
mWinType
==
WindowParams
::
RangeWindow
?
...
...
@@ -147,22 +153,43 @@ namespace pfabric {
if
(
outdated
==
true
)
{
// not sure if this is really necessary
this
->
getOutputDataChannel
().
publish
(
data
,
outdated
);
}
else
{
// insert the tuple into buffer
{
std
::
lock_guard
<
std
::
mutex
>
guard
(
this
->
mMtx
);
this
->
mTupleBuf
.
push_back
(
data
);
this
->
mCurrSize
++
;
}
}
else
{
// if function available
if
(
this
->
mWindowOpFunc
!=
nullptr
)
{
// apply function on tuple
auto
res
=
this
->
mWindowOpFunc
(
data
);
// insert the tuple into buffer
{
//necessary for lock scope!
std
::
lock_guard
<
std
::
mutex
>
guard
(
this
->
mMtx
);
this
->
mTupleBuf
.
push_back
(
res
);
this
->
mCurrSize
++
;
}
// check for outdated tuples
if
(
!
this
->
mEvictThread
)
{
this
->
mEvictFun
();
}
// check for outdated tuples
if
(
!
this
->
mEvictThread
)
{
this
->
mEvictFun
();
}
// finally, forward the incoming tuple
this
->
getOutputDataChannel
().
publish
(
data
,
outdated
);
// finally, forward the incoming tuple
this
->
getOutputDataChannel
().
publish
(
res
,
outdated
);
}
else
{
// insert the tuple into buffer
{
//necessary for lock scope!
std
::
lock_guard
<
std
::
mutex
>
guard
(
this
->
mMtx
);
this
->
mTupleBuf
.
push_back
(
data
);
this
->
mCurrSize
++
;
}
// check for outdated tuples
if
(
!
this
->
mEvictThread
)
{
this
->
mEvictFun
();
}
// finally, forward the incoming tuple
this
->
getOutputDataChannel
().
publish
(
data
,
outdated
);
}
}
}
...
...
src/qop/TumblingWindow.hpp
View file @
96c97fb7
...
...
@@ -55,10 +55,12 @@ namespace pfabric {
* @param func a function for extracting the timestamp value from the stream element
* @param wt the type of the window (range or row)
* @param sz the window size (seconds or number of tuples)
* @param windowFunc optional function for modifying incoming tuples
*/
TumblingWindow
(
typename
Window
<
StreamElement
>::
TimestampExtractorFunc
func
,
const
WindowParams
::
WinType
&
wt
,
const
unsigned
int
sz
)
:
WindowBase
(
func
,
wt
,
sz
,
sz
)
{
const
WindowParams
::
WinType
&
wt
,
const
unsigned
int
sz
,
typename
Window
<
StreamElement
>::
WindowOpFunc
windowFunc
=
nullptr
)
:
WindowBase
(
func
,
wt
,
sz
,
windowFunc
)
{
if
(
this
->
mWinType
==
WindowParams
::
RowWindow
)
{
this
->
mEvictFun
=
std
::
bind
(
&
TumblingWindow
::
evictByCount
,
this
);
}
...
...
@@ -72,9 +74,11 @@ namespace pfabric {
*
* @param wt the type of the window (range or row)
* @param sz the window size (seconds or number of tuples)
* @param windowFunc optional function for modifying incoming tuples
*/
TumblingWindow
(
const
WindowParams
::
WinType
&
wt
,
const
unsigned
int
sz
)
:
WindowBase
(
wt
,
sz
,
sz
)
{
TumblingWindow
(
const
WindowParams
::
WinType
&
wt
,
const
unsigned
int
sz
,
typename
Window
<
StreamElement
>::
WindowOpFunc
windowFunc
=
nullptr
)
:
WindowBase
(
wt
,
sz
,
windowFunc
)
{
if
(
this
->
mWinType
==
WindowParams
::
RowWindow
)
{
this
->
mEvictFun
=
std
::
bind
(
&
TumblingWindow
::
evictByCount
,
this
);
}
...
...
@@ -126,19 +130,41 @@ namespace pfabric {
return
;
}
else
{
// insert the tuple into buffer
{
std
::
lock_guard
<
std
::
mutex
>
guard
(
this
->
mMtx
);
this
->
mTupleBuf
.
push_back
(
data
);
this
->
mCurrSize
++
;
}
// if function available
if
(
this
->
mWindowOpFunc
!=
nullptr
)
{
// apply function on tuple
auto
res
=
this
->
mWindowOpFunc
(
data
);
// insert the tuple into buffer
{
//necessary for lock scope!
std
::
lock_guard
<
std
::
mutex
>
guard
(
this
->
mMtx
);
this
->
mTupleBuf
.
push_back
(
res
);
this
->
mCurrSize
++
;
}
//forward the incoming tuple
this
->
getOutputDataChannel
().
publish
(
data
,
outdated
);
//forward the incoming tuple
this
->
getOutputDataChannel
().
publish
(
res
,
outdated
);
// check for outdated tuples
if
(
!
this
->
mEvictThread
)
{
this
->
mEvictFun
();
}
// check for outdated tuples
if
(
!
this
->
mEvictThread
)
{
this
->
mEvictFun
();
}
else
{
// insert the tuple into buffer
{
//necessary for lock scope!
std
::
lock_guard
<
std
::
mutex
>
guard
(
this
->
mMtx
);
this
->
mTupleBuf
.
push_back
(
data
);
this
->
mCurrSize
++
;
}
//forward the incoming tuple
this
->
getOutputDataChannel
().
publish
(
data
,
outdated
);
// check for outdated tuples
if
(
!
this
->
mEvictThread
)
{
this
->
mEvictFun
();
}
}
}
}
...
...
src/qop/Window.hpp
View file @
96c97fb7
...
...
@@ -73,6 +73,8 @@ namespace pfabric {
public:
typedef
std
::
function
<
Timestamp
(
const
StreamElement
&
)
>
TimestampExtractorFunc
;
typedef
std
::
function
<
StreamElement
(
const
StreamElement
&
)
>
WindowOpFunc
;
//lambda function applied to incoming tuples in window
protected:
PFABRIC_UNARY_TRANSFORM_TYPEDEFS
(
StreamElement
,
StreamElement
);
...
...
@@ -82,11 +84,12 @@ namespace pfabric {
* @param func a function for extracting the timestamp value from the stream element
* @param wt the type of the window (range or row)
* @param sz the window size (seconds or number of tuples)
* @param winOpFunc optional function for modifying incoming tuples
* @param ei the eviction interval, i.e., time for triggering the eviction (in milliseconds)
*/
Window
(
TimestampExtractorFunc
func
,
const
WindowParams
::
WinType
&
wt
,
const
unsigned
int
sz
,
const
unsigned
int
ei
=
0
)
:
mTimestampExtractor
(
func
),
mWinType
(
wt
),
mWinSize
(
sz
),
mEvictInterval
(
ei
),
mCurrSize
(
0
)
{
const
unsigned
int
sz
,
WindowOpFunc
winOpFunc
=
nullptr
,
const
unsigned
int
ei
=
0
)
:
mTimestampExtractor
(
func
),
mWinType
(
wt
),
mWinSize
(
sz
),
mWindowOpFunc
(
winOpFunc
),
mEvictInterval
(
ei
),
mCurrSize
(
0
)
{
mDiffTime
=
(
mWinType
==
WindowParams
::
RangeWindow
?
boost
::
posix_time
::
seconds
(
mWinSize
).
total_microseconds
()
:
0
);
...
...
@@ -100,11 +103,12 @@ namespace pfabric {
*
* @param wt the type of the window (range or row)
* @param sz the window size (seconds or number of tuples)
* @param winOpFunc optional function for modifying incoming tuples
* @param ei ei the eviction interval, i.e., time for triggering the eviction (in milliseconds)
*/
Window
(
const
WindowParams
::
WinType
&
wt
,
const
unsigned
int
sz
,
const
unsigned
int
ei
=
0
)
:
mWinType
(
wt
),
mWinSize
(
sz
),
mEvictInterval
(
ei
),
mCurrSize
(
0
),
mDiffTime
(
0
)
{
const
unsigned
int
sz
,
WindowOpFunc
winOpFunc
=
nullptr
,
const
unsigned
int
ei
=
0
)
:
mWinType
(
wt
),
mWinSize
(
sz
),
mWindowOpFunc
(
winOpFunc
),
mEvictInterval
(
ei
),
mCurrSize
(
0
),
mDiffTime
(
0
)
{
BOOST_ASSERT_MSG
(
mWinType
==
WindowParams
::
RowWindow
,
"RowWindow requires timestamp extractor function."
);
}
...
...
@@ -125,6 +129,8 @@ namespace pfabric {
EvictionThread
mEvictThread
;
//< the thread for running the eviction function
//< (if the eviction interval > 0)
mutable
std
::
mutex
mMtx
;
//< mutex for accessing the tuple buffer
WindowOpFunc
mWindowOpFunc
;
//< function for modifying incoming tuples
};
/**
...
...
src/test/TopologyTest.cpp
View file @
96c97fb7
...
...
@@ -349,3 +349,39 @@ TEST_CASE("Tuplifying a stream of RDF strings", "[Tuplifier]") {
t
.
start
(
false
);
REQUIRE
(
results
.
size
()
==
3
);
}
TEST_CASE
(
"Using a window with and without additional function"
,
"[Window]"
)
{
typedef
TuplePtr
<
int
,
std
::
string
,
double
>
T1
;
typedef
Aggregator1
<
T1
,
AggrSum
<
double
>
,
2
>
AggrStateSum
;
TestDataGenerator
tgen
(
"file.csv"
);
tgen
.
writeData
(
10
);
std
::
stringstream
strm
;
std
::
string
expected
=
"0.5
\n
101
\n
301.5
\n
601.5
\n
901.5
\n
1201.5
\n
1501.5
\n
1801.5
\n
2101.5
\n
2401.5
\n
"
;
Topology
t1
;
auto
s1
=
t1
.
newStreamFromFile
(
"file.csv"
)
.
extract
<
T1
>
(
','
)
.
slidingWindow
(
WindowParams
::
RowWindow
,
3
)
.
aggregate
<
AggrStateSum
>
()
.
print
(
strm
);
t1
.
start
(
false
);
REQUIRE
(
strm
.
str
()
==
expected
);
std
::
stringstream
strm2
;
expected
=
"1.5
\n
103
\n
304.5
\n
604.5
\n
904.5
\n
1204.5
\n
1504.5
\n
1804.5
\n
2104.5
\n
2404.5
\n
"
;
auto
winFunc
=
[](
auto
tp
)
{
get
<
2
>
(
tp
)
++
;
return
tp
;
};
//just increment incoming tuples double-attribute by one
Topology
t2
;
auto
s2
=
t2
.
newStreamFromFile
(
"file.csv"
)
.
extract
<
T1
>
(
','
)
.
slidingWindow
(
WindowParams
::
RowWindow
,
3
,
winFunc
)
.
aggregate
<
AggrStateSum
>
()
.
print
(
strm2
);
t2
.
start
(
false
);
REQUIRE
(
strm2
.
str
()
==
expected
);
}
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment