Commit d8ce2009 authored by Kai-Uwe Sattler's avatar Kai-Uwe Sattler
Browse files

AggrIdentity and test case added

parent 575d8759
......@@ -358,7 +358,7 @@ of `int` values:
typedef TuplePtr<Tuple<int>> InTuple;
typedef Aggregator3<
InTuple, i // the input type for the aggregation
InTuple, // the input type for the aggregation
AggrAvg<int, int>,// an aggregate for calculating the average (input column type = int, output type = int)
0, // the values to be aggregated are taken from column 0
AggrCount<int>, // an aggregate for counting values (result type = int)
......@@ -387,8 +387,29 @@ tuple and a `finalize` function for producing the final result tuple.
#### groupBy #####
`Pipe<Tout> Pipe::groupBy(tType, tInterval)`
`Pipe<Tout> Pipe::groupBy(finalFun, iterFun, tType, tInterval)`
`Pipe<Tout> Pipe::groupBy<Tout, State, KeyType>(tType, tInterval)`
`Pipe<Tout> Pipe::groupBy<Tout, State, KeyType>(finalFun, iterFun, tType, tInterval)`
The `groupBy` operator implements the relational grouping on the key column and applies an incremental
aggregation on the individual groups. As in `aggregate` either one of the predefined `AggregatorN` class
or a user-defined class can be used to implement the `State` class. Compared to `aggregate` an additional
type parameter specifying the type of the grouping key is required.
The following example implements a simple grouping on the key column for calculating the sum per group.
In order to specify the key for grouping the `keyBy` operator is needed. Note, that we use the `AggrIdentity`
class to store the grouping value in the aggregator class.
typedef TuplePtr<Tuple<int, int, int>> Tin;
typedef TuplePtr<Tuple<int, int> > AggrRes; // group_id, sum(col1)
typedef Aggregator2<AggrRes, AggrIdentity<int>, 0, AggrSum<int>, 1> AggrState;
Topology t;
auto s = t.newStreamFromFile("data.csv")
.groupBy<AggrRes, AggrState, int>()
#### join ####
`Pipe<typename SHJoin<T, T2, KeyType>::ResultElement> Pipe::join(otherPipe, pred)`
......@@ -36,6 +36,7 @@
#include "aggr_functions/AggrMinMax.hpp"
#include "aggr_functions/AggrMRecent.hpp"
#include "aggr_functions/AggrSum.hpp"
#include "aggr_functions/AggrIdentity.hpp"
#endif /* AggregateFunctions_hpp_ */
* Copyright (c) 2014-17 The PipeFabric team,
* All Rights Reserved.
* This file is part of the PipeFabric package.
* PipeFabric is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License (GPL) as
* published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; see the file LICENSE.
* If not you can find the GPL at
#ifndef AggrIdentity_hpp_
#define AggrIdentity_hpp_
#include "AggregateFunc.hpp"
#include <type_traits>
namespace pfabric {
* @brief A aggregation function that just keeps the last value.
* A aggregation function that keeps the last value which can be used
* to store the grouping value.
* @tparam T
* the type of the input/output argument
typename T
class AggrIdentity : public AggregateFunc< T, T > {
AggrIdentity() {
virtual void init() override {
virtual void iterate(T const& data, bool outdated = false) override {
mValue = data;
virtual T value() override {
return mValue;
T mValue;
#endif /* AggrIdentity_hpp_ */
......@@ -183,3 +183,23 @@ TEST_CASE("Calculate distinct count", "[AggregateFunc]") {
REQUIRE(dcount.value() == 110);
TEST_CASE("Test AggrIdentity function", "[AggregateFunc]") {
AggrIdentity<int> aggr1;
for (int i = 0; i < 10; i++)
REQUIRE(aggr1.value() == 9);
/* ------------------------------- */
AggrIdentity<std::string> aggr2;
std::vector<std::string> data = { "aaa", "bbb", "ccc", "ddd", "eee" };
for (int i = 0; i < data.size(); i++)
REQUIRE(aggr2.value() == "eee");
......@@ -17,6 +17,31 @@
using namespace pfabric;
using namespace ns_types;
TEST_CASE("Building and running a topology with standard grouping", "[GroupBy]") {
typedef TuplePtr<Tuple<std::string, int>> MyTuplePtr;
typedef TuplePtr<Tuple<std::string, int>> AggrRes;
typedef Aggregator2<AggrRes, AggrIdentity<std::string>, 0, AggrCount<int, int>, 1> AggrState;
std::map<std::string, int> results;
Topology t;
auto s = t.streamFromGenerator<MyTuplePtr>([](unsigned long n) {
std::string key = fmt::format("KEY#{0}", n % 5);
return makeTuplePtr(key, (int)n);
}, 50)
.keyBy<0, std::string>()
.groupBy<AggrRes, AggrState, std::string>()
.notify([&](auto tp, bool outdated) {
results[get<0>(tp)] = get<1>(tp);
REQUIRE(results.size() == 5);
for (auto iter : results) {
REQUIRE(iter.second == 10);
template<typename StreamElement>
class MyAggregateState: public AggregateStateBase<StreamElement> {
......@@ -226,4 +251,4 @@ TEST_CASE("Building and running a topology with partitioned grouping",
REQUIRE(finalResults[i][1] == finalResults[i-1][1]+1.0);
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment