Commit b63126e2 authored by Kai-Uwe Sattler's avatar Kai-Uwe Sattler
Browse files

Fix in SHJoin; join added to python

parent 5e0c076f
......@@ -74,6 +74,11 @@ PyPipe PyPipe::notify(bp::object fun) {
}));
}
PyPipe PyPipe::queue() {
auto pipe = boost::get<TuplePipe&>(pipeImpl);
return PyPipe(pipe.queue());
}
PyPipe PyPipe::assignTimestamps(bp::object fun) {
auto pipe = boost::get<TuplePipe&>(pipeImpl);
return PyPipe(pipe.assignTimestamps([fun](auto tp) -> Timestamp {
......@@ -144,6 +149,25 @@ PyPipe PyPipe::print() {
return PyPipe(pipe.print(std::cout, pyFormatter));
}
PyPipe PyPipe::join(PyPipe other, bp::object pred) {
auto pipe = boost::get<TuplePipe&>(pipeImpl);
auto otherPipe = boost::get<TuplePipe&>(other.pipeImpl);
auto joinPipe = pipe.join<std::string>(otherPipe, [pred](auto tp1, auto tp2) {
return pred(get<0>(tp1), get<0>(tp2));
});
return PyPipe(joinPipe.map<PyTuplePtr>([](auto tp, bool o) -> PyTuplePtr {
bp::list seq;
auto tp1 = get<0>(tp);
for (auto i = 0; i < bp::len(tp1); i++) seq.append(tp1[i]);
auto tp2 = get<1>(tp);
for (auto i = 0; i < bp::len(tp2); i++) seq.append(tp2[i]);
return makeTuplePtr(bp::object(bp::tuple(seq)));
}));
}
PyTopology::PyTopology() {
topo = ctx.createTopology();
}
......@@ -171,6 +195,8 @@ BOOST_PYTHON_MODULE(pyfabric) {
.def("aggregate", &pfabric::PyPipe::aggregate)
.def("groupby_key", &pfabric::PyPipe::groupBy)
.def("sliding_window", &pfabric::PyPipe::slidingWindow)
.def("join", &pfabric::PyPipe::join)
.def("queue", &pfabric::PyPipe::queue)
.def("notify", &pfabric::PyPipe::notify)
.def("print", &pfabric::PyPipe::print)
;
......
......@@ -196,6 +196,10 @@ struct PyPipe {
PyPipe groupBy(bp::list columns, bp::list aggrFuncs);
PyPipe join(PyPipe other, bp::object predicate);
PyPipe queue();
/**
* @brief Creates a print operator.
*
......
......@@ -212,7 +212,7 @@ namespace pfabric {
typename HashTable,
typename StreamElement
>
static void updateHashTable( HashTable& hashTable, const key_t& key,
static void updateHashTable( HashTable& hashTable, const KeyType& key,
const StreamElement& newElement, const bool outdated, const Lock& lock ) {
boost::ignore_unused( lock );
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment