Commit 42d616af authored by Kai-Uwe Sattler's avatar Kai-Uwe Sattler
Browse files

Minor fixes in Python interface

parent e95847c9
......@@ -40,6 +40,15 @@ void PyAggregateState::setupAggregateFuncs() {
case AggrFuncType::GroupID:
mAggrFuncs.push_back(new AggrIdentity<std::string>());
break;
case AggrFuncType::IntIdentity:
mAggrFuncs.push_back(new AggrIdentity<int>());
break;
case AggrFuncType::DoubleIdentity:
mAggrFuncs.push_back(new AggrIdentity<double>());
break;
case AggrFuncType::StringIdentity:
mAggrFuncs.push_back(new AggrIdentity<std::string>());
break;
case AggrFuncType::IntSum:
mAggrFuncs.push_back(new AggrSum<int>());
break;
......@@ -94,6 +103,24 @@ void PyAggregateState::iterateForKey(const PyTuplePtr& tp, const std::string& ke
aggr->iterate(key, outdated);
break;
}
case AggrFuncType::IntIdentity: {
AggrIdentity<int> *aggr = dynamic_cast<AggrIdentity<int>*>(state->mAggrFuncs[i]);
int val = bp::extract<int>(pyObj);
aggr->iterate(val, outdated);
break;
}
case AggrFuncType::DoubleIdentity: {
AggrIdentity<double> *aggr = dynamic_cast<AggrIdentity<double>*>(state->mAggrFuncs[i]);
double val = bp::extract<double>(pyObj);
aggr->iterate(val, outdated);
break;
}
case AggrFuncType::StringIdentity: {
AggrIdentity<std::string> *aggr = dynamic_cast<AggrIdentity<std::string>*>(state->mAggrFuncs[i]);
std::string val = bp::extract<std::string>(pyObj);
aggr->iterate(val, outdated);
break;
}
case AggrFuncType::IntSum: {
AggrSum<int> *aggr = dynamic_cast<AggrSum<int>*>(state->mAggrFuncs[i]);
int val = bp::extract<int>(pyObj);
......@@ -113,12 +140,14 @@ void PyAggregateState::iterateForKey(const PyTuplePtr& tp, const std::string& ke
}
case AggrFuncType::IntAvg: {
AggrAvg<int, int> *aggr = dynamic_cast<AggrAvg<int, int>*>(state->mAggrFuncs[i]);
aggr->iterate(1, outdated);
int val = bp::extract<int>(pyObj);
aggr->iterate(val, outdated);
break;
}
case AggrFuncType::DoubleAvg: {
AggrAvg<double, double> *aggr = dynamic_cast<AggrAvg<double, double>*>(state->mAggrFuncs[i]);
aggr->iterate(1, outdated);
double val = bp::extract<double>(pyObj);
aggr->iterate(val, outdated);
break;
}
case AggrFuncType::IntMin: {
......@@ -173,6 +202,24 @@ void PyAggregateState::iterate(const PyTuplePtr& tp,
for (std::size_t i = 0; i < state->mFuncSpecs.size(); i++) {
auto pyObj = tup[state->mColumns[i]];
switch (state->mFuncSpecs[i]) {
case AggrFuncType::IntIdentity: {
AggrIdentity<int> *aggr = dynamic_cast<AggrIdentity<int>*>(state->mAggrFuncs[i]);
int val = bp::extract<int>(pyObj);
aggr->iterate(val, outdated);
break;
}
case AggrFuncType::DoubleIdentity: {
AggrIdentity<double> *aggr = dynamic_cast<AggrIdentity<double>*>(state->mAggrFuncs[i]);
double val = bp::extract<double>(pyObj);
aggr->iterate(val, outdated);
break;
}
case AggrFuncType::StringIdentity: {
AggrIdentity<std::string> *aggr = dynamic_cast<AggrIdentity<std::string>*>(state->mAggrFuncs[i]);
std::string val = bp::extract<std::string>(pyObj);
aggr->iterate(val, outdated);
break;
}
case AggrFuncType::IntSum: {
AggrSum<int> *aggr = dynamic_cast<AggrSum<int>*>(state->mAggrFuncs[i]);
int val = bp::extract<int>(pyObj);
......@@ -192,12 +239,14 @@ void PyAggregateState::iterate(const PyTuplePtr& tp,
}
case AggrFuncType::IntAvg: {
AggrAvg<int, int> *aggr = dynamic_cast<AggrAvg<int, int>*>(state->mAggrFuncs[i]);
aggr->iterate(1, outdated);
int val = bp::extract<int>(pyObj);
aggr->iterate(val, outdated);
break;
}
case AggrFuncType::DoubleAvg: {
AggrAvg<double, double> *aggr = dynamic_cast<AggrAvg<double, double>*>(state->mAggrFuncs[i]);
aggr->iterate(1, outdated);
double val = bp::extract<double>(pyObj);
aggr->iterate(val, outdated);
break;
}
case AggrFuncType::IntMin: {
......@@ -255,6 +304,21 @@ PyTuplePtr PyAggregateState::finalize(AggrStatePtr state) {
seq.append(aggr->value());
break;
}
case AggrFuncType::IntIdentity: {
AggrIdentity<int> *aggr = dynamic_cast<AggrIdentity<int>*>(state->mAggrFuncs[i]);
seq.append(aggr->value());
break;
}
case AggrFuncType::DoubleIdentity: {
AggrIdentity<double> *aggr = dynamic_cast<AggrIdentity<double>*>(state->mAggrFuncs[i]);
seq.append(aggr->value());
break;
}
case AggrFuncType::StringIdentity: {
AggrIdentity<std::string> *aggr = dynamic_cast<AggrIdentity<std::string>*>(state->mAggrFuncs[i]);
seq.append(aggr->value());
break;
}
case AggrFuncType::IntSum: {
AggrSum<int> *aggr = dynamic_cast<AggrSum<int>*>(state->mAggrFuncs[i]);
seq.append(aggr->value());
......
......@@ -182,7 +182,7 @@ void PyTopology::start() {
BOOST_PYTHON_MODULE(pyfabric) {
bp::class_<pfabric::PyTopology>("Topology")
.def("newStreamFromFile", &pfabric::PyTopology::newStreamFromFile)
.def("stream_from_file", &pfabric::PyTopology::newStreamFromFile)
.def("start", &pfabric::PyTopology::start)
;
......@@ -191,7 +191,7 @@ BOOST_PYTHON_MODULE(pyfabric) {
.def("where", &pfabric::PyPipe::where)
.def("map", &pfabric::PyPipe::map)
.def("assign_timestamps", &pfabric::PyPipe::assignTimestamps)
.def("keyBy", &pfabric::PyPipe::keyBy)
.def("key_by", &pfabric::PyPipe::keyBy)
.def("aggregate", &pfabric::PyPipe::aggregate)
.def("groupby_key", &pfabric::PyPipe::groupBy)
.def("sliding_window", &pfabric::PyPipe::slidingWindow)
......@@ -203,6 +203,9 @@ BOOST_PYTHON_MODULE(pyfabric) {
;
bp::enum_<pfabric::AggrFuncType>("aggr")
.value("Int", pfabric::AggrFuncType::IntIdentity)
.value("String", pfabric::AggrFuncType::StringIdentity)
.value("Double", pfabric::AggrFuncType::DoubleIdentity)
.value("IntSum", pfabric::AggrFuncType::IntSum)
.value("DoubleSum", pfabric::AggrFuncType::DoubleSum)
.value("IntAvg", pfabric::AggrFuncType::IntAvg)
......
......@@ -35,6 +35,7 @@ enum class AggrFuncType {
IntAvg, DoubleAvg,
IntMin, DoubleMin, StringMin,
IntMax, DoubleMax, StringMax,
IntIdentity, DoubleIdentity, StringIdentity,
GroupID
};
......
......@@ -20,7 +20,7 @@ class TestPipeFabricPython(unittest.TestCase):
strm = []
expected = ['1','teststring','1.5','2','teststring','2.5','3','teststring','3.5']
p = t.newStreamFromFile("data.csv") \
p = t.stream_from_file("data.csv") \
.extract(',') \
.notify(lambda tup, o: strm.extend((tup[0], tup[1], tup[2]))) \
.pfprint() \
......@@ -44,7 +44,7 @@ class TestPipeFabricPython(unittest.TestCase):
strm = []
expected = [(1,'teststring','1.5'),(2,'teststring','2.5'),(3,'teststring','3.5')]
p = t.newStreamFromFile("data.csv") \
p = t.stream_from_file("data.csv") \
.extract(',') \
.map(lambda t, o: (int(t[0]), t[1], t[2])) \
.notify(lambda t, o: strm.append(t)) \
......@@ -68,7 +68,7 @@ class TestPipeFabricPython(unittest.TestCase):
strm = []
expected = [(2,'teststring','2.5'),(3,'teststring','3.5')]
p = t.newStreamFromFile("data.csv") \
p = t.stream_from_file("data.csv") \
.extract(',') \
.map(lambda t, o: (int(t[0]), t[1], t[2])) \
.where(lambda x, o: x[0] > 1) \
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment