Commit 3d470fed authored by Kai-Uwe Sattler's avatar Kai-Uwe Sattler
Browse files

aggregate added to Python interface

parent 1614c795
......@@ -280,6 +280,7 @@ link_directories(${PYTHON_LIBDIR})
set(PYTHON_SOURCES
python/PyTopology.cpp
python/PyAggregateState.cpp
)
else()
set (PYTHON_SOURCES "")
......
......@@ -1032,6 +1032,32 @@ class Pipe {
}
}
template <typename Tout, typename AggrState>
Pipe<Tout> aggregate(
typename AggrState::AggrStatePtr state,
typename Aggregation<T, Tout, AggrState>::FinalFunc finalFun,
typename Aggregation<T, Tout, AggrState>::IterateFunc iterFun,
AggregationTriggerType tType = TriggerAll,
const unsigned int tInterval = 0) noexcept(false) {
if (partitioningState == NoPartitioning) {
auto op = std::make_shared<Aggregation<T, Tout, AggrState>>(
state, finalFun, iterFun, tType, tInterval);
auto iter =
addPublisher<Aggregation<T, Tout, AggrState>, DataSource<T>>(op);
return Pipe<Tout>(dataflow, iter, keyExtractor, timestampExtractor, transactionIDExtractor,
partitioningState, numPartitions);
} else {
std::vector<std::shared_ptr<Aggregation<T, Tout, AggrState>>> ops;
for (auto i = 0u; i < numPartitions; i++) {
ops.push_back(std::make_shared<Aggregation<T, Tout, AggrState>>(
state, finalFun, iterFun, tType, tInterval));
}
auto iter =
addPartitionedPublisher<Aggregation<T, Tout, AggrState>, T>(ops);
return Pipe<Tout>(dataflow, iter, keyExtractor, timestampExtractor, transactionIDExtractor,
partitioningState, numPartitions);
}
}
/**
* @brief Creates an operator for calculating grouped aggregates over the
entire stream.
......
/*
* Copyright (c) 2014-17 The PipeFabric team,
* All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License (GPL) as
* published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; see the file LICENSE.
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#include "PyTopology.hpp"
using namespace pfabric;
namespace bp = boost::python;
PyAggregateState::PyAggregateState(const std::vector<int>& cols,
const std::vector<AggrFuncType>& funcs) : mColumns(cols), mFuncSpecs(funcs) {
for (auto f : mFuncSpecs) {
switch (f) {
case AggrFuncType::IntSum:
mAggrFuncs.push_back(new AggrSum<int>());
break;
case AggrFuncType::Count:
mAggrFuncs.push_back(new AggrCount<int, int>());
break;
}
}
}
void PyAggregateState::init() {
for (auto& func : mAggrFuncs) {
func->init();
}
}
void PyAggregateState::iterate(const PyTuplePtr& tp,
AggrStatePtr state, const bool outdated) {
auto tup = get<0>(tp);
for (std::size_t i = 0; i < state->mFuncSpecs.size(); i++) {
switch (state->mFuncSpecs[i]) {
case AggrFuncType::IntSum: {
AggrSum<int> *aggr = dynamic_cast<AggrSum<int>*>(state->mAggrFuncs[i]);
int val = bp::extract<int>(tup[i]);
aggr->iterate(val, outdated);
break;
}
case AggrFuncType::Count: {
AggrCount<int, int> *aggr = dynamic_cast<AggrCount<int, int>*>(state->mAggrFuncs[i]);
aggr->iterate(1, outdated);
break;
}
}
}
}
PyTuplePtr PyAggregateState::finalize(AggrStatePtr state) {
bp::list seq;
for (std::size_t i = 0; i < state->mFuncSpecs.size(); i++) {
switch (state->mFuncSpecs[i]) {
case AggrFuncType::IntSum: {
AggrSum<int> *aggr = dynamic_cast<AggrSum<int>*>(state->mAggrFuncs[i]);
seq.append(aggr->value());
break;
}
case AggrFuncType::Count: {
AggrCount<int, int> *aggr = dynamic_cast<AggrCount<int, int>*>(state->mAggrFuncs[i]);
seq.append(aggr->value());
break;
}
}
}
return makeTuplePtr(bp::object(bp::tuple(seq)));
}
......@@ -78,11 +78,40 @@ PyPipe PyPipe::assignTimestamps(bp::object fun) {
auto pipe = boost::get<TuplePipe&>(pipeImpl);
return PyPipe(pipe.assignTimestamps([fun](auto tp) -> Timestamp {
auto res = fun(get<0>(tp));
return (Timestamp) bp::extract<long>(res);
return (Timestamp) bp::extract<long>(res);
}));
}
PyPipe PyPipe::keyBy(bp::object fun) {
auto pipe = boost::get<TuplePipe&>(pipeImpl);
return PyPipe(pipe.keyBy<bp::object>([fun](auto tp) {
return fun(get<0>(tp));
}));
}
PyPipe PyPipe::aggregate(bp::list columns, bp::list aggrFuncs) {
std::vector<int> columnVec;
std::vector<AggrFuncType> funcVec;
for (int i = 0; i < bp::len(columns); ++i) {
columnVec.push_back(bp::extract<int>(columns[i]));
funcVec.push_back(bp::extract<AggrFuncType>(aggrFuncs[i]));
}
auto pipe = boost::get<TuplePipe&>(pipeImpl);
auto state = std::make_shared<PyAggregateState>(columnVec, funcVec);
return PyPipe(pipe.aggregate<PyTuplePtr, PyAggregateState>(state, PyAggregateState::finalize,
PyAggregateState::iterate));
}
PyPipe PyPipe::print() {
/*
auto pipe = boost::get<TuplePipe&>(pipeImpl);
bp::object pyfabricModule = bp::import("pyfabric");
bp::object callback = pyfabricModule.attr("print_cb");
return PyPipe(pipe.notify([callback](auto tp, bool o) {
callback(get<0>(tp), o);
}));
*/
auto pipe = boost::get<TuplePipe&>(pipeImpl);
auto pyFormatter = [](std::ostream& os, auto tp) {
auto pyObj = get<0>(tp);
......@@ -118,11 +147,27 @@ BOOST_PYTHON_MODULE(pyfabric) {
.def("where", &pfabric::PyPipe::where)
.def("map", &pfabric::PyPipe::map)
.def("assign_timestamps", &pfabric::PyPipe::assignTimestamps)
.def("keyBy", &pfabric::PyPipe::keyBy)
.def("aggregate", &pfabric::PyPipe::aggregate)
.def("sliding_window", &pfabric::PyPipe::slidingWindow)
.def("notify", &pfabric::PyPipe::notify)
.def("print", &pfabric::PyPipe::print)
;
bp::enum_<pfabric::AggrFuncType>("aggr")
.value("IntSum", pfabric::AggrFuncType::IntSum)
.value("DoubleSum", pfabric::AggrFuncType::DoubleSum)
.value("IntAvg", pfabric::AggrFuncType::IntAvg)
.value("DoubleAvg", pfabric::AggrFuncType::DoubleAvg)
.value("Count", pfabric::AggrFuncType::Count)
.value("IntMin", pfabric::AggrFuncType::IntMin)
.value("DoubleMin", pfabric::AggrFuncType::DoubleMin)
.value("StringMin", pfabric::AggrFuncType::StringMin)
.value("IntMax", pfabric::AggrFuncType::IntMax)
.value("DoubleMax", pfabric::AggrFuncType::DoubleMax)
.value("StringMax", pfabric::AggrFuncType::StringMax)
;
bp::enum_<pfabric::WindowParams::WinType>("wintype")
.value("range", pfabric::WindowParams::RangeWindow)
.value("row", pfabric::WindowParams::RowWindow)
......
......@@ -29,10 +29,35 @@ namespace bp = boost::python;
namespace pfabric {
enum class AggrFuncType {
IntSum, DoubleSum,
Count,
IntAvg, DoubleAvg,
IntMin, DoubleMin, StringMin,
IntMax, DoubleMax, StringMax
};
/// We handle only tuples consisting of a single field that represents
/// a Python tuple.
typedef TuplePtr<bp::object> PyTuplePtr;
struct PyAggregateState : public AggregateStateBase<PyTuplePtr> {
public:
typedef std::shared_ptr<PyAggregateState> AggrStatePtr;
PyAggregateState(const std::vector<int>& cols, const std::vector<AggrFuncType>& funcs);
virtual void init();
static void iterate(const PyTuplePtr& tp, AggrStatePtr state, const bool outdated);
static PyTuplePtr finalize(AggrStatePtr state);
std::vector<int> mColumns;
std::vector<AggrFuncType> mFuncSpecs;
std::vector<AggregateFuncBasePtr> mAggrFuncs;
};
/**
* @brief PyPipe represents a sequence of operators applied to a data stream.
*
......@@ -122,6 +147,20 @@ struct PyPipe {
*/
PyPipe assignTimestamps(bp::object fun);
/**
* @brief Defines the key extractor function for all subsequent operators.
*
* Defines a function for extracting a key value from a tuple which is used
* for all subsequent operators which require such a function,
* e.g. join, groupBy.
*
* @param[in] func
* a function for extracting the key from the tuple
* @return a new pipe
*/
PyPipe keyBy(bp::object fun);
/**
* @brief Creates a sliding window operator as the next operator on the pipe.
*
......@@ -139,6 +178,8 @@ struct PyPipe {
*/
PyPipe slidingWindow(WindowParams::WinType wt, unsigned int size, unsigned int interval = 0);
PyPipe aggregate(bp::list columns, bp::list aggrFuncs);
/**
* @brief Creates a print operator.
*
......
......@@ -119,6 +119,15 @@ namespace pfabric {
mLastTriggerTime(0), mTriggerType(tType), mTriggerInterval( tInterval ), mCounter(0) {
}
Aggregation(AggregateStatePtr state, FinalFunc final_fun, IterateFunc it_fun,
AggregationTriggerType tType = TriggerAll, const unsigned int tInterval = 0) :
mAggrState(state),
mIterateFunc( it_fun ),
mFinalFunc( final_fun ),
mNotifier(tInterval > 0 && tType == TriggerByTime ?
new TriggerNotifier(std::bind(&Aggregation::notificationCallback, this), tInterval) : nullptr),
mLastTriggerTime(0), mTriggerType(tType), mTriggerInterval( tInterval ), mCounter(0) {
}
/**
* Create a new aggregation operator which receives an input stream and
* applies the given aggregate function incrementally with a TriggerByTimestamp
......@@ -142,6 +151,14 @@ namespace pfabric {
mNotifier(nullptr), mLastTriggerTime(0), mCounter(0) {
}
Aggregation(AggregateStatePtr state, FinalFunc final_fun, IterateFunc it_fun,
TimestampExtractorFunc func, const unsigned int tInterval) :
mAggrState(state),
mIterateFunc( it_fun ), mFinalFunc( final_fun ),
mTimestampExtractor(func),
mTriggerType(TriggerByTimestamp), mTriggerInterval( tInterval ),
mNotifier(nullptr), mLastTriggerTime(0), mCounter(0) {
}
/**
* @brief Bind the callback for the data channel.
*/
......
......@@ -18,7 +18,7 @@
* along with this program; see the file LICENSE.
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#ifndef AggregateFunc_hpp_
#define AggregateFunc_hpp_
......@@ -36,11 +36,22 @@ namespace pfabric {
* generally a bad idea since the object instance does not exist completely,
* so it might resolve to an incomplete type. CRTP would avoid this.
*/
class AggregateFuncBase {
protected:
AggregateFuncBase() {}
public:
virtual ~AggregateFuncBase() {}
virtual void init() = 0;
};
typedef AggregateFuncBase* AggregateFuncBasePtr;
template<
typename Tin,
typename Tres
>
class AggregateFunc {
class AggregateFunc : public AggregateFuncBase {
public:
typedef Tres ResultType;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment