Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
code
pfabric
Commits
5e0c076f
Commit
5e0c076f
authored
Dec 21, 2017
by
Kai-Uwe Sattler
Browse files
groupby_key added to python interface
parent
0bb43d0b
Changes
8
Hide whitespace changes
Inline
Side-by-side
src/dsl/Pipe.hpp
View file @
5e0c076f
...
...
@@ -1088,7 +1088,7 @@ class Pipe {
const
unsigned
int
tInterval
=
0
)
noexcept
(
false
)
{
static_assert
(
typename
AggrStateTraits
<
AggrState
>::
type
(),
"groupBy requires an AggrState class"
);
return
groupBy
<
typename
AggrState
::
ResultTypePtr
,
AggrState
,
KeyType
>
(
AggrState
::
finalize
,
AggrState
::
iterate
,
tType
,
tInterval
);
AggrState
::
finalize
,
AggrState
::
iterate
ForKey
,
tType
,
tInterval
);
}
/**
...
...
@@ -1124,6 +1124,51 @@ class Pipe {
* the interval for producing aggregate tuples
* @return a new pipe
*/
template
<
typename
Tout
,
typename
AggrState
,
typename
KeyType
=
DefaultKeyType
>
Pipe
<
Tout
>
groupBy
(
typename
AggrState
::
AggrStatePtr
&
state
,
typename
GroupedAggregation
<
T
,
Tout
,
AggrState
,
KeyType
>::
FactoryFunc
createFun
,
typename
GroupedAggregation
<
T
,
Tout
,
AggrState
,
KeyType
>::
FinalFunc
finalFun
,
typename
GroupedAggregation
<
T
,
Tout
,
AggrState
,
KeyType
>::
IterateFunc
iterFun
,
AggregationTriggerType
tType
=
TriggerAll
,
const
unsigned
int
tInterval
=
0
)
noexcept
(
false
)
{
try
{
typedef
std
::
function
<
KeyType
(
const
T
&
)
>
KeyExtractorFunc
;
KeyExtractorFunc
keyFunc
=
boost
::
any_cast
<
KeyExtractorFunc
>
(
keyExtractor
);
if
(
partitioningState
==
NoPartitioning
)
{
auto
op
=
std
::
make_shared
<
GroupedAggregation
<
T
,
Tout
,
AggrState
,
KeyType
>>
(
state
,
createFun
,
keyFunc
,
finalFun
,
iterFun
,
tType
,
tInterval
);
auto
iter
=
addPublisher
<
GroupedAggregation
<
T
,
Tout
,
AggrState
,
KeyType
>
,
DataSource
<
T
>>
(
op
);
return
Pipe
<
Tout
>
(
dataflow
,
iter
,
keyExtractor
,
timestampExtractor
,
transactionIDExtractor
,
partitioningState
,
numPartitions
);
}
else
{
std
::
vector
<
std
::
shared_ptr
<
GroupedAggregation
<
T
,
Tout
,
AggrState
,
KeyType
>>>
ops
;
for
(
auto
i
=
0u
;
i
<
numPartitions
;
i
++
)
{
ops
.
push_back
(
std
::
make_shared
<
GroupedAggregation
<
T
,
Tout
,
AggrState
,
KeyType
>>
(
state
,
createFun
,
keyFunc
,
finalFun
,
iterFun
,
tType
,
tInterval
));
}
auto
iter
=
addPartitionedPublisher
<
GroupedAggregation
<
T
,
Tout
,
AggrState
,
KeyType
>
,
T
>
(
ops
);
return
Pipe
<
Tout
>
(
dataflow
,
iter
,
keyExtractor
,
timestampExtractor
,
transactionIDExtractor
,
partitioningState
,
numPartitions
);
}
}
catch
(
boost
::
bad_any_cast
&
e
)
{
throw
TopologyException
(
"No KeyExtractor defined for groupBy."
);
}
}
template
<
typename
Tout
,
typename
AggrState
,
typename
KeyType
=
DefaultKeyType
>
Pipe
<
Tout
>
groupBy
(
...
...
src/python/PyAggregateState.cpp
View file @
5e0c076f
...
...
@@ -26,8 +26,20 @@ namespace bp = boost::python;
PyAggregateState
::
PyAggregateState
(
const
std
::
vector
<
int
>&
cols
,
const
std
::
vector
<
AggrFuncType
>&
funcs
)
:
mColumns
(
cols
),
mFuncSpecs
(
funcs
)
{
setupAggregateFuncs
();
}
PyAggregateState
::
PyAggregateState
(
const
PyAggregateState
&
s
)
:
mColumns
(
s
.
mColumns
),
mFuncSpecs
(
s
.
mFuncSpecs
)
{
setupAggregateFuncs
();
}
void
PyAggregateState
::
setupAggregateFuncs
()
{
for
(
auto
f
:
mFuncSpecs
)
{
switch
(
f
)
{
case
AggrFuncType
::
GroupID
:
mAggrFuncs
.
push_back
(
new
AggrIdentity
<
std
::
string
>
());
break
;
case
AggrFuncType
::
IntSum
:
mAggrFuncs
.
push_back
(
new
AggrSum
<
int
>
());
break
;
...
...
@@ -71,20 +83,105 @@ void PyAggregateState::init() {
}
}
void
PyAggregateState
::
iterateForKey
(
const
PyTuplePtr
&
tp
,
const
std
::
string
&
key
,
AggrStatePtr
state
,
const
bool
outdated
)
{
auto
tup
=
get
<
0
>
(
tp
);
for
(
std
::
size_t
i
=
0
;
i
<
state
->
mFuncSpecs
.
size
();
i
++
)
{
auto
pyObj
=
tup
[
state
->
mColumns
[
i
]];
switch
(
state
->
mFuncSpecs
[
i
])
{
case
AggrFuncType
::
GroupID
:
{
AggrIdentity
<
std
::
string
>
*
aggr
=
dynamic_cast
<
AggrIdentity
<
std
::
string
>*>
(
state
->
mAggrFuncs
[
i
]);
aggr
->
iterate
(
key
,
outdated
);
break
;
}
case
AggrFuncType
::
IntSum
:
{
AggrSum
<
int
>
*
aggr
=
dynamic_cast
<
AggrSum
<
int
>*>
(
state
->
mAggrFuncs
[
i
]);
int
val
=
bp
::
extract
<
int
>
(
pyObj
);
aggr
->
iterate
(
val
,
outdated
);
break
;
}
case
AggrFuncType
::
DoubleSum
:
{
AggrSum
<
double
>
*
aggr
=
dynamic_cast
<
AggrSum
<
double
>*>
(
state
->
mAggrFuncs
[
i
]);
double
val
=
bp
::
extract
<
double
>
(
pyObj
);
aggr
->
iterate
(
val
,
outdated
);
break
;
}
case
AggrFuncType
::
Count
:
{
AggrCount
<
int
,
int
>
*
aggr
=
dynamic_cast
<
AggrCount
<
int
,
int
>*>
(
state
->
mAggrFuncs
[
i
]);
aggr
->
iterate
(
1
,
outdated
);
break
;
}
case
AggrFuncType
::
IntAvg
:
{
AggrAvg
<
int
,
int
>
*
aggr
=
dynamic_cast
<
AggrAvg
<
int
,
int
>*>
(
state
->
mAggrFuncs
[
i
]);
aggr
->
iterate
(
1
,
outdated
);
break
;
}
case
AggrFuncType
::
DoubleAvg
:
{
AggrAvg
<
double
,
double
>
*
aggr
=
dynamic_cast
<
AggrAvg
<
double
,
double
>*>
(
state
->
mAggrFuncs
[
i
]);
aggr
->
iterate
(
1
,
outdated
);
break
;
}
case
AggrFuncType
::
IntMin
:
{
AggrMinMax
<
int
,
std
::
less
<
int
>>
*
aggr
=
dynamic_cast
<
AggrMinMax
<
int
,
std
::
less
<
int
>>*>
(
state
->
mAggrFuncs
[
i
]);
int
val
=
bp
::
extract
<
int
>
(
pyObj
);
aggr
->
iterate
(
val
,
outdated
);
break
;
}
case
AggrFuncType
::
DoubleMin
:
{
AggrMinMax
<
double
,
std
::
less
<
double
>>
*
aggr
=
dynamic_cast
<
AggrMinMax
<
double
,
std
::
less
<
double
>>*>
(
state
->
mAggrFuncs
[
i
]);
double
val
=
bp
::
extract
<
double
>
(
pyObj
);
aggr
->
iterate
(
val
,
outdated
);
break
;
}
case
AggrFuncType
::
StringMin
:
{
AggrMinMax
<
std
::
string
,
std
::
less
<
std
::
string
>>
*
aggr
=
dynamic_cast
<
AggrMinMax
<
std
::
string
,
std
::
less
<
std
::
string
>>*>
(
state
->
mAggrFuncs
[
i
]);
std
::
string
val
=
bp
::
extract
<
std
::
string
>
(
pyObj
);
aggr
->
iterate
(
val
,
outdated
);
break
;
}
case
AggrFuncType
::
IntMax
:
{
AggrMinMax
<
int
,
std
::
greater
<
int
>>
*
aggr
=
dynamic_cast
<
AggrMinMax
<
int
,
std
::
greater
<
int
>>*>
(
state
->
mAggrFuncs
[
i
]);
int
val
=
bp
::
extract
<
int
>
(
pyObj
);
aggr
->
iterate
(
val
,
outdated
);
break
;
}
case
AggrFuncType
::
DoubleMax
:
{
AggrMinMax
<
double
,
std
::
greater
<
double
>>
*
aggr
=
dynamic_cast
<
AggrMinMax
<
double
,
std
::
greater
<
double
>>*>
(
state
->
mAggrFuncs
[
i
]);
double
val
=
bp
::
extract
<
double
>
(
pyObj
);
aggr
->
iterate
(
val
,
outdated
);
break
;
}
case
AggrFuncType
::
StringMax
:
{
AggrMinMax
<
std
::
string
,
std
::
greater
<
std
::
string
>>
*
aggr
=
dynamic_cast
<
AggrMinMax
<
std
::
string
,
std
::
greater
<
std
::
string
>>*>
(
state
->
mAggrFuncs
[
i
]);
std
::
string
val
=
bp
::
extract
<
std
::
string
>
(
pyObj
);
aggr
->
iterate
(
val
,
outdated
);
break
;
}
}
}
}
void
PyAggregateState
::
iterate
(
const
PyTuplePtr
&
tp
,
AggrStatePtr
state
,
const
bool
outdated
)
{
auto
tup
=
get
<
0
>
(
tp
);
for
(
std
::
size_t
i
=
0
;
i
<
state
->
mFuncSpecs
.
size
();
i
++
)
{
auto
pyObj
=
tup
[
state
->
mColumns
[
i
]];
switch
(
state
->
mFuncSpecs
[
i
])
{
case
AggrFuncType
::
IntSum
:
{
AggrSum
<
int
>
*
aggr
=
dynamic_cast
<
AggrSum
<
int
>*>
(
state
->
mAggrFuncs
[
i
]);
int
val
=
bp
::
extract
<
int
>
(
tup
[
i
]
);
int
val
=
bp
::
extract
<
int
>
(
pyObj
);
aggr
->
iterate
(
val
,
outdated
);
break
;
}
case
AggrFuncType
::
DoubleSum
:
{
AggrSum
<
double
>
*
aggr
=
dynamic_cast
<
AggrSum
<
double
>*>
(
state
->
mAggrFuncs
[
i
]);
double
val
=
bp
::
extract
<
double
>
(
tup
[
i
]
);
double
val
=
bp
::
extract
<
double
>
(
pyObj
);
aggr
->
iterate
(
val
,
outdated
);
break
;
}
...
...
@@ -106,42 +203,42 @@ void PyAggregateState::iterate(const PyTuplePtr& tp,
case
AggrFuncType
::
IntMin
:
{
AggrMinMax
<
int
,
std
::
less
<
int
>>
*
aggr
=
dynamic_cast
<
AggrMinMax
<
int
,
std
::
less
<
int
>>*>
(
state
->
mAggrFuncs
[
i
]);
int
val
=
bp
::
extract
<
int
>
(
tup
[
i
]
);
int
val
=
bp
::
extract
<
int
>
(
pyObj
);
aggr
->
iterate
(
val
,
outdated
);
break
;
}
case
AggrFuncType
::
DoubleMin
:
{
AggrMinMax
<
double
,
std
::
less
<
double
>>
*
aggr
=
dynamic_cast
<
AggrMinMax
<
double
,
std
::
less
<
double
>>*>
(
state
->
mAggrFuncs
[
i
]);
double
val
=
bp
::
extract
<
double
>
(
tup
[
i
]
);
double
val
=
bp
::
extract
<
double
>
(
pyObj
);
aggr
->
iterate
(
val
,
outdated
);
break
;
}
case
AggrFuncType
::
StringMin
:
{
AggrMinMax
<
std
::
string
,
std
::
less
<
std
::
string
>>
*
aggr
=
dynamic_cast
<
AggrMinMax
<
std
::
string
,
std
::
less
<
std
::
string
>>*>
(
state
->
mAggrFuncs
[
i
]);
std
::
string
val
=
bp
::
extract
<
std
::
string
>
(
tup
[
i
]
);
std
::
string
val
=
bp
::
extract
<
std
::
string
>
(
pyObj
);
aggr
->
iterate
(
val
,
outdated
);
break
;
}
case
AggrFuncType
::
IntMax
:
{
AggrMinMax
<
int
,
std
::
greater
<
int
>>
*
aggr
=
dynamic_cast
<
AggrMinMax
<
int
,
std
::
greater
<
int
>>*>
(
state
->
mAggrFuncs
[
i
]);
int
val
=
bp
::
extract
<
int
>
(
tup
[
i
]
);
int
val
=
bp
::
extract
<
int
>
(
pyObj
);
aggr
->
iterate
(
val
,
outdated
);
break
;
}
case
AggrFuncType
::
DoubleMax
:
{
AggrMinMax
<
double
,
std
::
greater
<
double
>>
*
aggr
=
dynamic_cast
<
AggrMinMax
<
double
,
std
::
greater
<
double
>>*>
(
state
->
mAggrFuncs
[
i
]);
double
val
=
bp
::
extract
<
double
>
(
tup
[
i
]
);
double
val
=
bp
::
extract
<
double
>
(
pyObj
);
aggr
->
iterate
(
val
,
outdated
);
break
;
}
case
AggrFuncType
::
StringMax
:
{
AggrMinMax
<
std
::
string
,
std
::
greater
<
std
::
string
>>
*
aggr
=
dynamic_cast
<
AggrMinMax
<
std
::
string
,
std
::
greater
<
std
::
string
>>*>
(
state
->
mAggrFuncs
[
i
]);
std
::
string
val
=
bp
::
extract
<
std
::
string
>
(
tup
[
i
]
);
std
::
string
val
=
bp
::
extract
<
std
::
string
>
(
pyObj
);
aggr
->
iterate
(
val
,
outdated
);
break
;
}
...
...
@@ -153,6 +250,11 @@ PyTuplePtr PyAggregateState::finalize(AggrStatePtr state) {
bp
::
list
seq
;
for
(
std
::
size_t
i
=
0
;
i
<
state
->
mFuncSpecs
.
size
();
i
++
)
{
switch
(
state
->
mFuncSpecs
[
i
])
{
case
AggrFuncType
::
GroupID
:
{
AggrIdentity
<
std
::
string
>
*
aggr
=
dynamic_cast
<
AggrIdentity
<
std
::
string
>*>
(
state
->
mAggrFuncs
[
i
]);
seq
.
append
(
aggr
->
value
());
break
;
}
case
AggrFuncType
::
IntSum
:
{
AggrSum
<
int
>
*
aggr
=
dynamic_cast
<
AggrSum
<
int
>*>
(
state
->
mAggrFuncs
[
i
]);
seq
.
append
(
aggr
->
value
());
...
...
src/python/PyTopology.cpp
View file @
5e0c076f
...
...
@@ -84,8 +84,10 @@ PyPipe PyPipe::assignTimestamps(bp::object fun) {
PyPipe
PyPipe
::
keyBy
(
bp
::
object
fun
)
{
auto
pipe
=
boost
::
get
<
TuplePipe
&>
(
pipeImpl
);
return
PyPipe
(
pipe
.
keyBy
<
bp
::
object
>
([
fun
](
auto
tp
)
{
return
fun
(
get
<
0
>
(
tp
));
return
PyPipe
(
pipe
.
keyBy
<
std
::
string
>
([
fun
](
auto
tp
)
->
std
::
string
{
bp
::
object
res
=
fun
(
get
<
0
>
(
tp
));
const
char
*
s
=
bp
::
extract
<
const
char
*>
(
bp
::
str
(
res
));
return
std
::
string
(
s
);
// bp::extract<std::string>(bp::str(fun(get<0>(tp))));
}));
}
...
...
@@ -102,6 +104,24 @@ PyPipe PyPipe::aggregate(bp::list columns, bp::list aggrFuncs) {
PyAggregateState
::
iterate
));
}
PyPipe
PyPipe
::
groupBy
(
bp
::
list
columns
,
bp
::
list
aggrFuncs
)
{
std
::
vector
<
int
>
columnVec
;
std
::
vector
<
AggrFuncType
>
funcVec
;
columnVec
.
push_back
(
0
);
funcVec
.
push_back
(
AggrFuncType
::
GroupID
);
for
(
int
i
=
0
;
i
<
bp
::
len
(
columns
);
++
i
)
{
columnVec
.
push_back
(
bp
::
extract
<
int
>
(
columns
[
i
]));
funcVec
.
push_back
(
bp
::
extract
<
AggrFuncType
>
(
aggrFuncs
[
i
]));
}
auto
pipe
=
boost
::
get
<
TuplePipe
&>
(
pipeImpl
);
auto
state
=
std
::
make_shared
<
PyAggregateState
>
(
columnVec
,
funcVec
);
return
PyPipe
(
pipe
.
groupBy
<
PyTuplePtr
,
PyAggregateState
,
std
::
string
>
(
state
,
PyAggregateState
::
create
,
PyAggregateState
::
finalize
,
PyAggregateState
::
iterateForKey
));
}
PyPipe
PyPipe
::
print
()
{
/*
auto pipe = boost::get<TuplePipe&>(pipeImpl);
...
...
@@ -149,6 +169,7 @@ BOOST_PYTHON_MODULE(pyfabric) {
.
def
(
"assign_timestamps"
,
&
pfabric
::
PyPipe
::
assignTimestamps
)
.
def
(
"keyBy"
,
&
pfabric
::
PyPipe
::
keyBy
)
.
def
(
"aggregate"
,
&
pfabric
::
PyPipe
::
aggregate
)
.
def
(
"groupby_key"
,
&
pfabric
::
PyPipe
::
groupBy
)
.
def
(
"sliding_window"
,
&
pfabric
::
PyPipe
::
slidingWindow
)
.
def
(
"notify"
,
&
pfabric
::
PyPipe
::
notify
)
.
def
(
"print"
,
&
pfabric
::
PyPipe
::
print
)
...
...
src/python/PyTopology.hpp
View file @
5e0c076f
...
...
@@ -34,25 +34,39 @@ enum class AggrFuncType {
Count
,
IntAvg
,
DoubleAvg
,
IntMin
,
DoubleMin
,
StringMin
,
IntMax
,
DoubleMax
,
StringMax
IntMax
,
DoubleMax
,
StringMax
,
GroupID
};
/// We handle only tuples consisting of a single field that represents
/// a Python tuple.
typedef
TuplePtr
<
bp
::
object
>
PyTuplePtr
;
struct
PyAggregateState
:
public
AggregateStateBase
<
PyTuplePtr
>
{
class
PyAggregateState
:
public
AggregateStateBase
<
PyTuplePtr
>
{
public:
typedef
std
::
shared_ptr
<
PyAggregateState
>
AggrStatePtr
;
PyAggregateState
()
{}
PyAggregateState
(
const
std
::
vector
<
int
>&
cols
,
const
std
::
vector
<
AggrFuncType
>&
funcs
);
PyAggregateState
(
const
PyAggregateState
&
s
);
virtual
void
init
();
static
void
iterate
(
const
PyTuplePtr
&
tp
,
AggrStatePtr
state
,
const
bool
outdated
);
static
AggrStatePtr
create
(
AggrStatePtr
state
)
{
PyAggregateState
*
self
=
state
.
get
();
return
std
::
make_shared
<
PyAggregateState
>
(
PyAggregateState
(
*
self
));
}
static
void
iterate
(
const
PyTuplePtr
&
tp
,
AggrStatePtr
state
,
const
bool
outdated
);
static
void
iterateForKey
(
const
PyTuplePtr
&
tp
,
const
std
::
string
&
key
,
AggrStatePtr
state
,
const
bool
outdated
);
static
PyTuplePtr
finalize
(
AggrStatePtr
state
);
private:
void
setupAggregateFuncs
();
std
::
vector
<
int
>
mColumns
;
std
::
vector
<
AggrFuncType
>
mFuncSpecs
;
std
::
vector
<
AggregateFuncBasePtr
>
mAggrFuncs
;
...
...
@@ -180,6 +194,8 @@ struct PyPipe {
PyPipe
aggregate
(
bp
::
list
columns
,
bp
::
list
aggrFuncs
);
PyPipe
groupBy
(
bp
::
list
columns
,
bp
::
list
aggrFuncs
);
/**
* @brief Creates a print operator.
*
...
...
src/qop/AggregateStateBase.hpp
View file @
5e0c076f
...
...
@@ -112,7 +112,7 @@ struct AggrStateTraits : std::false_type{};
* @tparam Aggr1Col
* the field number (0..n) in the input tuple used for aggregation
*/
template
<
typename
StreamElement
,
typename
Aggr1Func
,
int
Aggr1Col
>
template
<
typename
StreamElement
,
typename
Aggr1Func
,
int
Aggr1Col
,
typename
KeyType
=
DefaultKeyType
>
class
Aggregator1
:
public
AggregateStateBase
<
StreamElement
>
{
Aggr1Func
aggr1_
;
...
...
@@ -125,7 +125,7 @@ public:
/**
* Typedef for a pointer to the aggregation state.
*/
typedef
std
::
shared_ptr
<
Aggregator1
<
StreamElement
,
Aggr1Func
,
Aggr1Col
>>
AggrStatePtr
;
typedef
std
::
shared_ptr
<
Aggregator1
<
StreamElement
,
Aggr1Func
,
Aggr1Col
,
KeyType
>>
AggrStatePtr
;
/**
* Create a new aggregation state instance.
...
...
@@ -147,6 +147,10 @@ public:
* @param state the aggregate state object
* @param outdated true if the tuple is outdated
*/
static
void
iterateForKey
(
const
StreamElement
&
tp
,
const
KeyType
&
,
AggrStatePtr
state
,
const
bool
outdated
)
{
state
->
aggr1_
.
iterate
(
getAttribute
<
Aggr1Col
>
(
*
tp
),
outdated
);
}
static
void
iterate
(
const
StreamElement
&
tp
,
AggrStatePtr
state
,
const
bool
outdated
)
{
state
->
aggr1_
.
iterate
(
getAttribute
<
Aggr1Col
>
(
*
tp
),
outdated
);
}
...
...
@@ -163,8 +167,8 @@ public:
}
};
template
<
typename
StreamElement
,
typename
Aggr1Func
,
int
Aggr1Col
>
struct
AggrStateTraits
<
Aggregator1
<
StreamElement
,
Aggr1Func
,
Aggr1Col
>>
:
std
::
true_type
{};
template
<
typename
StreamElement
,
typename
Aggr1Func
,
int
Aggr1Col
,
typename
KeyType
>
struct
AggrStateTraits
<
Aggregator1
<
StreamElement
,
Aggr1Func
,
Aggr1Col
,
KeyType
>>
:
std
::
true_type
{};
/**
* Aggregator2 represents the aggregation state for two aggregation
...
...
@@ -186,7 +190,8 @@ struct AggrStateTraits<Aggregator1<StreamElement, Aggr1Func, Aggr1Col>> : std::t
template
<
typename
StreamElement
,
typename
Aggr1Func
,
int
Aggr1Col
,
typename
Aggr2Func
,
int
Aggr2Col
typename
Aggr2Func
,
int
Aggr2Col
,
typename
KeyType
=
DefaultKeyType
>
class
Aggregator2
:
public
AggregateStateBase
<
StreamElement
>
{
Aggr1Func
aggr1_
;
...
...
@@ -202,7 +207,7 @@ public:
* Typedef for a pointer to the aggregation state.
*/
typedef
std
::
shared_ptr
<
Aggregator2
<
StreamElement
,
Aggr1Func
,
Aggr1Col
,
Aggr2Func
,
Aggr2Col
>>
AggrStatePtr
;
Aggr2Func
,
Aggr2Col
,
KeyType
>>
AggrStatePtr
;
/**
* Create a new aggregation state instance.
...
...
@@ -225,6 +230,11 @@ public:
* @param state the aggregate state object
* @param outdated true if the tuple is outdated
*/
static
void
iterateForKey
(
const
StreamElement
&
tp
,
const
KeyType
&
,
AggrStatePtr
state
,
const
bool
outdated
)
{
state
->
aggr1_
.
iterate
(
getAttribute
<
Aggr1Col
>
(
*
tp
),
outdated
);
state
->
aggr2_
.
iterate
(
getAttribute
<
Aggr2Col
>
(
*
tp
),
outdated
);
}
static
void
iterate
(
const
StreamElement
&
tp
,
AggrStatePtr
state
,
const
bool
outdated
)
{
state
->
aggr1_
.
iterate
(
getAttribute
<
Aggr1Col
>
(
*
tp
),
outdated
);
state
->
aggr2_
.
iterate
(
getAttribute
<
Aggr2Col
>
(
*
tp
),
outdated
);
...
...
@@ -245,11 +255,12 @@ public:
template
<
typename
StreamElement
,
typename
Aggr1Func
,
int
Aggr1Col
,
typename
Aggr2Func
,
int
Aggr2Col
typename
Aggr2Func
,
int
Aggr2Col
,
typename
KeyType
>
struct
AggrStateTraits
<
Aggregator2
<
StreamElement
,
struct
AggrStateTraits
<
Aggregator2
<
StreamElement
,
Aggr1Func
,
Aggr1Col
,
Aggr2Func
,
Aggr2Col
Aggr2Func
,
Aggr2Col
,
KeyType
>>
:
std
::
true_type
{};
/**
...
...
@@ -345,7 +356,7 @@ template <
typename
Aggr2Func
,
int
Aggr2Col
,
typename
Aggr3Func
,
int
Aggr3Col
>
struct
AggrStateTraits
<
Aggregator3
<
StreamElement
,
struct
AggrStateTraits
<
Aggregator3
<
StreamElement
,
Aggr1Func
,
Aggr1Col
,
Aggr2Func
,
Aggr2Col
,
Aggr3Func
,
Aggr3Col
...
...
@@ -456,7 +467,7 @@ template <
typename
Aggr3Func
,
int
Aggr3Col
,
typename
Aggr4Func
,
int
Aggr4Col
>
struct
AggrStateTraits
<
Aggregator4
<
StreamElement
,
struct
AggrStateTraits
<
Aggregator4
<
StreamElement
,
Aggr1Func
,
Aggr1Col
,
Aggr2Func
,
Aggr2Col
,
Aggr3Func
,
Aggr3Col
,
...
...
src/qop/GroupedAggregation.hpp
View file @
5e0c076f
...
...
@@ -100,7 +100,10 @@ public:
*
* This function gets the incoming stream element, the aggregate state, and the boolean flag for outdated elements.
*/
typedef
std
::
function
<
void
(
const
InputStreamElement
&
,
AggregateStatePtr
,
const
bool
)
>
IterateFunc
;
typedef
std
::
function
<
void
(
const
InputStreamElement
&
,
const
KeyType
&
,
AggregateStatePtr
,
const
bool
)
>
IterateFunc
;
typedef
std
::
function
<
AggregateStatePtr
(
AggregateStatePtr
)
>
FactoryFunc
;
protected:
/// a mutex for protecting aggregation processing from concurrent sources
...
...
@@ -149,6 +152,50 @@ public:
mLastTriggerTime
(
0
),
mTriggerType
(
tType
),
mCounter
(
0
)
{
}
/**
* @brief Create a new instance of the GroupedAggregation operator.
*
* Create a new instance of the operator for computing aggregates per groups.
* The behaviour is defined by the trigger type (all, timestamp, count - see PipeFabricTypes.hpp)
* and the trigger interval. In contrast to the constructor above, a factory object
* for the aggregation state is provided by the caller.
*
* @param factory
* a factory object that is used to create new instances representin a new group
* @param factory_fun
* a function pointer to create a new instance of a group state from the factory object
* @param groupby_fun
* a function pointer for getting the group id
* @param final_fun
* a function pointer to the aggregation function
* @param it_fun
* a function pointer to an iteration function called for each incoming tuple
* @param tType
* the trigger type specifying when an aggregation tuple is produced
* (TriggerAll = for each incoming tuple,
* TriggerByCount = as soon as a number of tuples (tInterval) are processed,
* TriggerByTime = after every tInterval seconds,
* TriggerByTimestamp = as for TriggerByTime but based on timestamp of the tuples
* and not based on real time)
* @param tInterval
* the time interval in seconds to produce aggregation tuples (for trigger by timestamp)
* or in the number of tuples (for trigger by count)
*/
GroupedAggregation
(
AggregateStatePtr
&
factory
,
FactoryFunc
factory_fun
,
GroupByFunc
groupby_fun
,
FinalFunc
final_fun
,
IterateFunc
it_fun
,
AggregationTriggerType
tType
=
TriggerAll
,
const
unsigned
int
tInterval
=
0
)
:
mGroupByFunc
(
groupby_fun
),
mIterateFunc
(
it_fun
),
mFinalFunc
(
final_fun
),
mTriggerInterval
(
tInterval
),
mNotifier
(
tInterval
>
0
&&
tType
==
TriggerByTime
?
new
TriggerNotifier
(
std
::
bind
(
&
GroupedAggregation
::
notificationCallback
,
this
),
tInterval
)
:
nullptr
),
mLastTriggerTime
(
0
),
mTriggerType
(
tType
),
mCounter
(
0
),
mFactory
(
factory
),
mFactoryFunc
(
factory_fun
)
{
}
/**
* @brief Create a new instance of the GroupedAggregation operator.
*
...
...
@@ -283,11 +330,12 @@ private:
const
Timestamp
elementTime
=
mTimestampExtractor
!=
nullptr
?
mTimestampExtractor
(
data
)
:
0
;
// create a new aggregation state
AggregateStatePtr
newAggrState
=
std
::
make_shared
<
AggregateState
>
();
// if a factory object was provided we can call its create method
AggregateStatePtr
newAggrState
=
mFactory
?
mFactoryFunc
(
mFactory
)
:
std
::
make_shared
<
AggregateState
>
();
newAggrState
->
setTimestamp
(
elementTime
);
// ... call the iterate function
mIterateFunc
(
data
,
newAggrState
,
outdated
);
mIterateFunc
(
data
,
grpKey
,
newAggrState
,
outdated
);
// ... and insert it into the hashtable
mAggregateTable
.
insert
({
grpKey
,
newAggrState
});
...
...
@@ -330,7 +378,7 @@ private:
aggrState
->
setTimestamp
(
elementTime
);
aggrState
->
updateCounter
(
outdated
?
-
1
:
1
);
const
bool
outdatedAggregate
=
(
aggrState
->
getCounter
()
==
0
);
mIterateFunc
(
data
,
aggrState
,
outdated
);
mIterateFunc
(
data
,
grpKey
,
aggrState
,
outdated
);
// 3. directly publish the new aggregation result if no sliding window was specified
// TODO: should an outdated tuple trigger also an aggregation tuple??
...
...
@@ -422,6 +470,8 @@ private:
Timestamp
mLastTriggerTime
;
//!< the timestamp of the last aggregate publishing
AggregationTriggerType
mTriggerType
;
//!< the type of trigger activating the publishing of an aggregate value
unsigned
int
mCounter
;
//!< the number of tuples processed since the last aggregate publishing
AggregateStatePtr
mFactory
;
FactoryFunc
mFactoryFunc
;
};
}
/* end namespace pfabric */
...
...
src/test/GroupedAggregationTest.cpp
View file @
5e0c076f
...
...
@@ -86,7 +86,7 @@ TEST_CASE( "Compute a simple punctuation based groupby with aggregates", "[Group
return
tp
;
},
/* iterate function */
[
&
](
const
InTuplePtr
&
tp
,
MyAggrStatePtr
myState
,
const
bool
outdated
)
{
[
&
](
const
InTuplePtr
&
tp
,
const
int
&
,
MyAggrStatePtr
myState
,
const
bool
outdated
)
{
myState
->
group1_
=
tp
->
getAttribute
<
0
>
();
myState
->
sum1_
.
iterate
(
tp
->
getAttribute
<
1
>
(),
outdated
);
myState
->
avg2_
.
iterate
(
tp
->
getAttribute
<
1
>
(),
outdated
);
...
...
@@ -160,7 +160,7 @@ TEST_CASE( "Compute a groupby with incremental min/max aggregates", "[GroupedAgg
return
tp
;
},
/* iterate function */
[
&
](
const
InTuplePtr
&
tp
,
MyAggrState2Ptr
myState
,
const
bool
outdated
)
{
[
&
](
const
InTuplePtr
&
tp
,
const
int
&
,
MyAggrState2Ptr
myState
,
const
bool
outdated
)
{
myState
->
group1_
=
tp
->
getAttribute
<
0
>
();
myState
->
min1_
.
iterate
(
tp
->
getAttribute
<
1
>
(),
outdated
);
myState
->
max2_
.
iterate
(
tp
->
getAttribute
<
1
>
(),
outdated
);
...
...
src/test/TopologyGroupByTest.cpp
View file @
5e0c076f
...
...
@@ -20,7 +20,8 @@ using namespace ns_types;
TEST_CASE
(
"Building and running a topology with standard grouping"
,
"[GroupBy]"
)
{
typedef
TuplePtr
<
std
::
string
,
int
>
MyTuplePtr
;
typedef
TuplePtr
<
std
::
string
,
int
>
AggrRes
;
typedef
Aggregator2
<
MyTuplePtr
,
AggrIdentity
<
std
::
string
>
,
0
,
AggrCount
<
int
,
int
>
,
1
>
AggrState
;
typedef
Aggregator2
<
MyTuplePtr
,
AggrIdentity
<
std
::
string
>
,
0
,
AggrCount
<
int
,
int
>
,
1
,
std
::
string
>
AggrState
;
std
::
map
<
std
::
string
,
int
>
results
;
Topology
t
;
...
...
@@ -56,12 +57,12 @@ public:
}
};
TEST_CASE
(
"Building and running a topology with simple unpartitioned grouping"
,
TEST_CASE
(
"Building and running a topology with simple unpartitioned grouping"
,
"[Simple unpartitioned Grouping]"
)
{
typedef
TuplePtr
<
unsigned
long
,
double
>
MyTuplePtr
;
typedef
TuplePtr
<
double
>
AggregationResultPtr
;
typedef
Aggregator1
<
MyTuplePtr
,
AggrSum
<
double
>
,
1
>
AggrStateSum
;
StreamGenerator
<
MyTuplePtr
>::
Generator
streamGen
([](
unsigned
long
n
)
->
MyTuplePtr
{
if
(
n
<
500
)
return
makeTuplePtr
((
unsigned
long
)
0
,
(
double
)
n
+
0.5
);
else
return
makeTuplePtr
((
unsigned
long
)
n
,
(
double
)
n
+
0.5
);
...
...
@@ -96,24 +97,24 @@ TEST_CASE("Building and running a topology with simple unpartitioned grouping",
}
}
TEST_CASE
(
"Building and running a topology with unpartitioned grouping"
,
TEST_CASE
(
"Building and running a topology with unpartitioned grouping"
,