Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
code
pfabric
Commits
a5b588ee
Commit
a5b588ee
authored
Aug 17, 2017
by
Constantin Pohl
Browse files
Removed warning & improved test case
parent
44d2d981
Changes
2
Hide whitespace changes
Inline
Side-by-side
src/qop/Window.hpp
View file @
a5b588ee
...
...
@@ -128,6 +128,7 @@ namespace pfabric {
TimestampExtractorFunc
mTimestampExtractor
;
//< a function for extracting timestamps from a tuple
WindowParams
::
WinType
mWinType
;
//< the type of window
unsigned
int
mWinSize
;
//< the size of window (time or number of tuples)
WindowOpFunc
mWindowOpFunc
;
//< function for modifying incoming tuples
unsigned
int
mEvictInterval
;
//< the slide length of window (time or number of tuples)
TupleList
mTupleBuf
;
//< the actual window buffer
unsigned
int
mCurrSize
;
//< the current number of tuples in the window
...
...
@@ -137,8 +138,6 @@ namespace pfabric {
EvictionThread
mEvictThread
;
//< the thread for running the eviction function
//< (if the eviction interval > 0)
mutable
std
::
mutex
mMtx
;
//< mutex for accessing the tuple buffer
WindowOpFunc
mWindowOpFunc
;
//< function for modifying incoming tuples
};
/**
...
...
src/test/TopologyTest.cpp
View file @
a5b588ee
...
...
@@ -352,6 +352,7 @@ TEST_CASE("Tuplifying a stream of RDF strings", "[Tuplifier]") {
TEST_CASE
(
"Using a window with and without additional function"
,
"[Window]"
)
{
typedef
TuplePtr
<
int
,
std
::
string
,
double
>
T1
;
typedef
TuplePtr
<
int
>
T2
;
typedef
Aggregator1
<
T1
,
AggrSum
<
double
>
,
2
>
AggrStateSum
;
TestDataGenerator
tgen
(
"file.csv"
);
...
...
@@ -373,7 +374,8 @@ TEST_CASE("Using a window with and without additional function", "[Window]") {
std
::
stringstream
strm2
;
expected
=
"1.5
\n
103
\n
304.5
\n
604.5
\n
904.5
\n
1204.5
\n
1504.5
\n
1804.5
\n
2104.5
\n
2404.5
\n
"
;
auto
winFunc
=
[](
auto
beg
,
auto
end
,
auto
tp
)
{
get
<
2
>
(
tp
)
++
;
return
tp
;
};
//just increment incoming tuples double-attribute by one
//just increment incoming tuples double-attribute by one
auto
winFunc
=
[](
auto
beg
,
auto
end
,
auto
tp
)
{
get
<
2
>
(
tp
)
++
;
return
tp
;
};
Topology
t2
;
auto
s2
=
t2
.
newStreamFromFile
(
"file.csv"
)
...
...
@@ -384,4 +386,27 @@ TEST_CASE("Using a window with and without additional function", "[Window]") {
t2
.
start
(
false
);
REQUIRE
(
strm2
.
str
()
==
expected
);
std
::
stringstream
strm3
;
expected
=
"0
\n
1
\n
1
\n
2
\n
2
\n
3
\n
4
\n
5
\n
6
\n
7
\n
"
;
//find median of ints
auto
winFuncMedian
=
[](
auto
beg
,
auto
end
,
auto
tp
)
{
std
::
vector
<
int
>
winInts
(
0
);
for
(
auto
it
=
beg
;
it
!=
end
;
++
it
)
{
winInts
.
push_back
(
get
<
0
>
(
*
it
));
}
std
::
sort
(
winInts
.
begin
(),
winInts
.
end
());
return
makeTuplePtr
(
winInts
[
winInts
.
size
()
/
2
]);
};
Topology
t3
;
auto
s3
=
t3
.
newStreamFromFile
(
"file.csv"
)
.
extract
<
T1
>
(
','
)
.
map
<
T2
>
([](
auto
tp
,
bool
outdated
)
->
T2
{
return
makeTuplePtr
(
get
<
0
>
(
tp
));
})
.
slidingWindow
(
WindowParams
::
RowWindow
,
5
,
winFuncMedian
)
.
print
(
strm3
);
t3
.
start
(
false
);
REQUIRE
(
strm3
.
str
()
==
expected
);
}
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment