Commit d3f69916 authored by Kai-Uwe Sattler's avatar Kai-Uwe Sattler
Browse files

DistinctCount added to Python interface

parent 42d616af
......@@ -58,6 +58,9 @@ void PyAggregateState::setupAggregateFuncs() {
case AggrFuncType::Count:
mAggrFuncs.push_back(new AggrCount<int, int>());
break;
case AggrFuncType::DCount:
mAggrFuncs.push_back(new AggrDCount<int, int>());
break;
case AggrFuncType::IntAvg:
mAggrFuncs.push_back(new AggrAvg<int, int>());
break;
......@@ -138,6 +141,12 @@ void PyAggregateState::iterateForKey(const PyTuplePtr& tp, const std::string& ke
aggr->iterate(1, outdated);
break;
}
case AggrFuncType::DCount: {
AggrDCount<int, int> *aggr = dynamic_cast<AggrDCount<int, int>*>(state->mAggrFuncs[i]);
int val = bp::extract<int>(pyObj);
aggr->iterate(val, outdated);
break;
}
case AggrFuncType::IntAvg: {
AggrAvg<int, int> *aggr = dynamic_cast<AggrAvg<int, int>*>(state->mAggrFuncs[i]);
int val = bp::extract<int>(pyObj);
......@@ -237,6 +246,12 @@ void PyAggregateState::iterate(const PyTuplePtr& tp,
aggr->iterate(1, outdated);
break;
}
case AggrFuncType::DCount: {
AggrDCount<int, int> *aggr = dynamic_cast<AggrDCount<int, int>*>(state->mAggrFuncs[i]);
int val = bp::extract<int>(pyObj);
aggr->iterate(val, outdated);
break;
}
case AggrFuncType::IntAvg: {
AggrAvg<int, int> *aggr = dynamic_cast<AggrAvg<int, int>*>(state->mAggrFuncs[i]);
int val = bp::extract<int>(pyObj);
......@@ -334,6 +349,11 @@ PyTuplePtr PyAggregateState::finalize(AggrStatePtr state) {
seq.append(aggr->value());
break;
}
case AggrFuncType::DCount: {
AggrDCount<int, int> *aggr = dynamic_cast<AggrDCount<int, int>*>(state->mAggrFuncs[i]);
seq.append(aggr->value());
break;
}
case AggrFuncType::IntAvg: {
AggrAvg<int, int> *aggr = dynamic_cast<AggrAvg<int, int>*>(state->mAggrFuncs[i]);
seq.append(aggr->value());
......
......@@ -128,15 +128,6 @@ PyPipe PyPipe::groupBy(bp::list columns, bp::list aggrFuncs) {
}
PyPipe PyPipe::print() {
/*
auto pipe = boost::get<TuplePipe&>(pipeImpl);
bp::object pyfabricModule = bp::import("pyfabric");
bp::object callback = pyfabricModule.attr("print_cb");
return PyPipe(pipe.notify([callback](auto tp, bool o) {
callback(get<0>(tp), o);
}));
*/
auto pipe = boost::get<TuplePipe&>(pipeImpl);
auto pyFormatter = [](std::ostream& os, auto tp) {
auto pyObj = get<0>(tp);
......@@ -211,6 +202,7 @@ BOOST_PYTHON_MODULE(pyfabric) {
.value("IntAvg", pfabric::AggrFuncType::IntAvg)
.value("DoubleAvg", pfabric::AggrFuncType::DoubleAvg)
.value("Count", pfabric::AggrFuncType::Count)
.value("DistinctCount", pfabric::AggrFuncType::DCount)
.value("IntMin", pfabric::AggrFuncType::IntMin)
.value("DoubleMin", pfabric::AggrFuncType::DoubleMin)
.value("StringMin", pfabric::AggrFuncType::StringMin)
......
......@@ -31,7 +31,7 @@ namespace pfabric {
enum class AggrFuncType {
IntSum, DoubleSum,
Count,
Count, DCount,
IntAvg, DoubleAvg,
IntMin, DoubleMin, StringMin,
IntMax, DoubleMax, StringMax,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment