Commit 03948596 authored by Kai-Uwe Sattler's avatar Kai-Uwe Sattler
Browse files

result type of aggregate and goupBy derived from AggrState class

parent 93b1bb35
......@@ -374,7 +374,7 @@ separate thread to retrieve tuples from the queue and sent them downstream. In t
#### aggregate ####
`Pipe<Tout> Pipe::aggregate<Tout, State>(tType, tInterval)`
`Pipe<Tout> Pipe::aggregate<State>(tType, tInterval)`
`Pipe<Tout> Pipe::aggregate<Tout, State>(finalFun, iterFun, tType, tInterval)`
calculates a set of aggregates over the stream of type `Tin`, possibly supported by a window. The aggregates a represented by tuples
......@@ -398,16 +398,15 @@ typedef Aggregator3<
> AggrState;
```
With the help of this state class, `aggregate` can be used. However, we have to define the ouput type of the
aggregate which consists in this case of three `int` values (for avg, count, sum):
With the help of this state class, `aggregate` can be used. Note, that we don't have to define the output
type - it is derived from the `AggrState` class:
```C++
tyepdef TuplePtr<Tuple<int, int, int>> OutTuple;
Topology t;
auto s = t.newStreamFromFile("data.csv")
.extract<InTuple>(',')
.aggregate<OutTuple, AggrState>()
.aggregate<AggrState>()
...
```
......@@ -417,7 +416,7 @@ tuple and a `finalize` function for producing the final result tuple.
#### groupBy #####
`Pipe<Tout> Pipe::groupBy<Tout, State, KeyType>(tType, tInterval)`
`Pipe<Tout> Pipe::groupBy<State, KeyType>(tType, tInterval)`
`Pipe<Tout> Pipe::groupBy<Tout, State, KeyType>(finalFun, iterFun, tType, tInterval)`
The `groupBy` operator implements the relational grouping on the key column and applies an incremental
......@@ -431,14 +430,14 @@ class to store the grouping value in the aggregator class.
```C++
typedef TuplePtr<Tuple<int, int, int>> Tin;
typedef TuplePtr<Tuple<int, int> > AggrRes; // group_id, sum(col1)
typedef TuplePtr<Tuple<int, int> > AggrRes; // group_id, sum(col1), but not needed here!
typedef Aggregator2<AggrRes, AggrIdentity<int>, 0, AggrSum<int>, 1> AggrState;
Topology t;
auto s = t.newStreamFromFile("data.csv")
.extract<Tin>(',')
.keyBy<0, int>()
.groupBy<AggrRes, AggrState, int>()
.groupBy<AggrState, int>()
```
#### join ####
......
......@@ -52,7 +52,7 @@ int main(int argc, char **argv) {
auto s = t->newStreamFromREST(8099, "^/publish$", RESTSource::POST_METHOD)
.extractJson<InTuplePtr>({"key", "data"})
.slidingWindow(WindowParams::RowWindow, 10)
.aggregate<ResultTuplePtr, MyAggrState> ()
.aggregate<MyAggrState> ()
.print(std::cout);
t->start();
......
......@@ -175,7 +175,7 @@ void TopologyGroupByTest(benchmark::State& state) {
auto s = t.newStreamFromFile("file.csv")
.extract<T1>(',')
.keyBy<int>([](auto tp) { return get<0>(tp); })
.groupBy<T2, AggrStateSum, int>()
.groupBy<AggrStateSum, int>()
.map<T2>([](auto tp, bool outdated) -> T2 {
double math = doMath(get<0>(tp));
return makeTuplePtr(math);
......@@ -207,7 +207,7 @@ void TopologyPartitionedGroupByTest(benchmark::State& state) {
.extract<T1>(',')
.keyBy<int>([](auto tp) { return get<0>(tp); })
.partitionBy([](auto tp) { return get<0>(tp) % 3; }, 3)
.groupBy<T2, AggrStateSum, int>()
.groupBy<AggrStateSum, int>()
.map<T2>([](auto tp, bool outdated) -> T2 {
double math = doMath(get<0>(tp));
return makeTuplePtr(math);
......
......@@ -24,7 +24,7 @@ int main(int argc, char **argv) {
auto s = t->newStreamFromREST(8099, "^/publish$", RESTSource::POST_METHOD)
.extractJson<InTuplePtr>({"key", "data"})
.slidingWindow(WindowParams::RowWindow, 10)
.aggregate<ResultTuplePtr, MyAggrState>()
.aggregate<MyAggrState>()
.notify([&](auto tp, bool outdated) { std::cout << tp << std::endl; });
// .print<ResultTuplePtr>(std::cout);
......
......@@ -891,11 +891,9 @@ class Pipe {
*
* // Aggregator1 defines already functions for finalize and iterate
* t->newStreamFrom...
* .aggregate<T2, MyAggrState> ()
* .aggregate<MyAggrState> ()
* @endcode
*
* @tparam Tout
* the result tuple type (usually a TuplePtr) for the operator.
* @tparam AggrState
* the type of representing the aggregation state as a subclass of
* @c AggregationStateBase. There are predefined template classes
......@@ -907,11 +905,12 @@ class Pipe {
* the interval for producing aggregate tuples
* @return a new pipe
*/
template <typename Tout, typename AggrState>
Pipe<Tout> aggregate(
template <typename AggrState>
Pipe<typename AggrState::ResultTypePtr> aggregate(
AggregationTriggerType tType = TriggerAll,
const unsigned int tInterval = 0) throw(TopologyException) {
return aggregate<Tout, AggrState>(AggrState::finalize, AggrState::iterate,
static_assert(typename AggrStateTraits<AggrState>::type(), "aggregate requires an AggrState class");
return aggregate<typename AggrState::ResultTypePtr, AggrState>(AggrState::finalize, AggrState::iterate,
tType, tInterval);
}
......@@ -994,10 +993,7 @@ class Pipe {
supports
* window-based aggregation by handling delete tuples accordingly.
*
* @tparam Tin
* the input tuple type (usually a TuplePtr) for the operator.
* @tparam Tout
* the result tuple type (usually a TuplePtr) for the operator.
* @tparam AggrState
* the type of representing the aggregation state as a subclass of
* @c AggregationStateBase. There are predefined template classes
* @c Aggregator1 ... @c AggregatorN which can be used directly here.
......@@ -1010,12 +1006,13 @@ class Pipe {
* the interval for producing aggregate tuples
* @return a new pipe
*/
template <typename Tout, typename AggrState,
template <typename AggrState,
typename KeyType = DefaultKeyType>
Pipe<Tout> groupBy(
Pipe<typename AggrState::ResultTypePtr> groupBy(
AggregationTriggerType tType = TriggerAll,
const unsigned int tInterval = 0) throw(TopologyException) {
return groupBy<Tout, AggrState, KeyType>(
static_assert(typename AggrStateTraits<AggrState>::type(), "groupBy requires an AggrState class");
return groupBy<typename AggrState::ResultTypePtr, AggrState, KeyType>(
AggrState::finalize, AggrState::iterate, tType, tInterval);
}
......
......@@ -88,6 +88,14 @@ public:
unsigned int mCounter; //< counter for aggregation
};
/**
* AggrStateTraits is a trait to check whether a class satisfies
* the requirements of an aggregator class, i.e. to define a
* @c ResultTypePtr.
*/
template<typename T>
struct AggrStateTraits : std::false_type{};
/**
* Aggregator1 represents the aggregation state for a single aggregation
* function.
......@@ -155,6 +163,9 @@ public:
}
};
template <typename StreamElement, typename Aggr1Func, int Aggr1Col>
struct AggrStateTraits<Aggregator1<StreamElement, Aggr1Func, Aggr1Col>> : std::true_type{};
/**
* Aggregator2 represents the aggregation state for two aggregation
* functions.
......@@ -231,6 +242,16 @@ public:
}
};
template <
typename StreamElement,
typename Aggr1Func, int Aggr1Col,
typename Aggr2Func, int Aggr2Col
>
struct AggrStateTraits<Aggregator2<StreamElement,
Aggr1Func, Aggr1Col,
Aggr2Func, Aggr2Col
>> : std::true_type{};
/**
* Aggregator3 represents the aggregation state for three aggregation
* functions.
......@@ -318,6 +339,18 @@ public:
}
};
template <
typename StreamElement,
typename Aggr1Func, int Aggr1Col,
typename Aggr2Func, int Aggr2Col,
typename Aggr3Func, int Aggr3Col
>
struct AggrStateTraits<Aggregator3<StreamElement,
Aggr1Func, Aggr1Col,
Aggr2Func, Aggr2Col,
Aggr3Func, Aggr3Col
>> : std::true_type{};
/**
* Aggregator4 represents the aggregation state for four aggregation
* functions.
......@@ -415,6 +448,21 @@ public:
state->aggr3_.value(), state->aggr4_.value());
}
};
template <
typename StreamElement,
typename Aggr1Func, int Aggr1Col,
typename Aggr2Func, int Aggr2Col,
typename Aggr3Func, int Aggr3Col,
typename Aggr4Func, int Aggr4Col
>
struct AggrStateTraits<Aggregator4<StreamElement,
Aggr1Func, Aggr1Col,
Aggr2Func, Aggr2Col,
Aggr3Func, Aggr3Col,
Aggr4Func, Aggr4Col
>> : std::true_type{};
} /* end namespace pfabric */
......
......@@ -34,7 +34,7 @@ TEST_CASE("Building and running a topology with unpartitioned aggregation",
Topology t;
auto s = t.streamFromGenerator<MyTuplePtr>(streamGen, num)
.keyBy<0>()
.aggregate<AggregationResultPtr, AggrStateSum>()
.aggregate<AggrStateSum>()
.notify([&](auto tp, bool outdated) {
if (tuplesProcessed < num)
results.push_back(get<0>(tp));
......@@ -71,7 +71,7 @@ TEST_CASE("Building and running a topology with partitioned aggregation",
auto s = t.streamFromGenerator<MyTuplePtr>(streamGen, num)
.keyBy<0>()
.partitionBy([](auto tp) { return get<0>(tp) % 5; }, 5)
.aggregate<AggregationResultPtr, AggrStateSum>()
.aggregate<AggrStateSum>()
.merge() //TODO: Use new merge operator
.notify([&](auto tp, bool outdated) {
if (tuplesProcessed < num)
......@@ -89,4 +89,4 @@ TEST_CASE("Building and running a topology with partitioned aggregation",
REQUIRE(results.size() == num);
//REQUIRE(results[num-1] == 500000);
}
\ No newline at end of file
}
......@@ -29,7 +29,7 @@ TEST_CASE("Building and running a topology with standard grouping", "[GroupBy]")
return makeTuplePtr(key, (int)n);
}, 50)
.keyBy<0, std::string>()
.groupBy<AggrRes, AggrState, std::string>()
.groupBy<AggrState, std::string>()
.notify([&](auto tp, bool outdated) {
results[get<0>(tp)] = get<1>(tp);
});
......@@ -74,7 +74,7 @@ TEST_CASE("Building and running a topology with simple unpartitioned grouping",
Topology t;
auto s = t.streamFromGenerator<MyTuplePtr>(streamGen, num)
.keyBy<0>()
.groupBy<AggregationResultPtr, AggrStateSum, unsigned long>()
.groupBy<AggrStateSum, unsigned long>()
.notify([&](auto tp, bool outdated) {
if (tuplesProcessed < num)
results.push_back(get<0>(tp));
......
......@@ -230,7 +230,7 @@ TEST_CASE("Building and running a topology with grouping", "[GroupBy]") {
Topology t;
auto s = t.streamFromGenerator<T1>(streamGen, num)
.keyBy<int>([](auto tp) { return get<0>(tp); })
.groupBy<T2, AggrStateSum, int>()
.groupBy<AggrStateSum, int>()
.print(strm);
t.start(false);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment