Commit 1614c795 authored by Kai-Uwe Sattler's avatar Kai-Uwe Sattler
Browse files

window added to pyfabric

parent f069941c
......@@ -15,9 +15,9 @@ processed in such a topology are native Python tuples, lambda functions of the
different operators (e.g. `where`, `map` etc.) can be written directly in Python.
```Python
import libpfabric
import pyfabric
t = libpfabric.Topology()
t = pyfabric.Topology()
p = t.newStreamFromFile("data.csv") \
.extract(',') \
.map(lambda t, o: (int(t[0]), t[1], t[2])) \
......
......@@ -477,19 +477,20 @@ target_link_libraries(pfabric_cep
#
# Building PipeFabric Python library
#
add_library(pfabric SHARED
add_library(pyfabric SHARED
${PYTHON_SOURCES}
# we need this as dependency to download the sources from github
${THIRD_PARTY_DIR}/fmt
)
target_link_libraries(pfabric
target_link_libraries(pyfabric
${Boost_PYTHON3_LIBRARY}
${PYTHON_LIBRARIES}
pfabric_core
${BOOST_LIBRARIES}
)
set_target_properties(pyfabric PROPERTIES PREFIX "" SUFFIX ".so")
#-----------------------------------------------------------------------------------------
#
# Experimental SQL query compiler
......
......@@ -134,6 +134,9 @@ class Pipe {
numPartitions = p.numPartitions;
dataflow = p.dataflow;
tailIter = p.tailIter;
keyExtractor = p.keyExtractor;
timestampExtractor = p.timestampExtractor;
transactionIDExtractor = p.transactionIDExtractor;
}
private:
......
......@@ -20,6 +20,8 @@
*/
#include "PyTopology.hpp"
#include <boost/python/enum.hpp>
using namespace pfabric;
namespace bp = boost::python;
......@@ -59,6 +61,27 @@ PyPipe PyPipe::map(bp::object fun) {
}));
}
PyPipe PyPipe::slidingWindow(WindowParams::WinType wt, unsigned int size,
unsigned int interval) {
auto pipe = boost::get<TuplePipe&>(pipeImpl);
return PyPipe(pipe.slidingWindow(wt, size, nullptr, interval));
}
PyPipe PyPipe::notify(bp::object fun) {
auto pipe = boost::get<TuplePipe&>(pipeImpl);
return PyPipe(pipe.notify([fun](auto tp, bool o) {
fun(get<0>(tp), o);
}));
}
PyPipe PyPipe::assignTimestamps(bp::object fun) {
auto pipe = boost::get<TuplePipe&>(pipeImpl);
return PyPipe(pipe.assignTimestamps([fun](auto tp) -> Timestamp {
auto res = fun(get<0>(tp));
return (Timestamp) bp::extract<long>(res);
}));
}
PyPipe PyPipe::print() {
auto pipe = boost::get<TuplePipe&>(pipeImpl);
auto pyFormatter = [](std::ostream& os, auto tp) {
......@@ -84,7 +107,7 @@ void PyTopology::start() {
topo->start(false);
}
BOOST_PYTHON_MODULE(libpfabric) {
BOOST_PYTHON_MODULE(pyfabric) {
bp::class_<pfabric::PyTopology>("Topology")
.def("newStreamFromFile", &pfabric::PyTopology::newStreamFromFile)
.def("start", &pfabric::PyTopology::start)
......@@ -94,7 +117,15 @@ BOOST_PYTHON_MODULE(libpfabric) {
.def("extract", &pfabric::PyPipe::extract)
.def("where", &pfabric::PyPipe::where)
.def("map", &pfabric::PyPipe::map)
.def("assign_timestamps", &pfabric::PyPipe::assignTimestamps)
.def("sliding_window", &pfabric::PyPipe::slidingWindow)
.def("notify", &pfabric::PyPipe::notify)
.def("print", &pfabric::PyPipe::print)
;
bp::enum_<pfabric::WindowParams::WinType>("wintype")
.value("range", pfabric::WindowParams::RangeWindow)
.value("row", pfabric::WindowParams::RowWindow)
;
}
......@@ -92,6 +92,53 @@ struct PyPipe {
*/
PyPipe map(bp::object fun);
/**
* @brief Creates a notify operator for passing stream tuples
* to a callback function.
*
* Creates a notify operator for triggering a callback on each input tuple
* and forwarding the tuples to the next operator on the pipe.
* @param[in] func
* a lambda function representing the callback that is invoked for
* each input tuple
* @param[in] pfunc
* an optional lambda function representing the callback
* that is invoked for each punctuation
* @return a new PyPipe object
*/
PyPipe notify(bp::object fun);
/**
* @brief Defines the timestamp extractor function for all subsequent
* operators.
*
* Defines a function for extracting a timestamp from a tuple which is used
* for all subsequent operators which require such a function, e.g. windows.
*
* @param[in] func
* a function for extracting the timestamp of the tuple
* @return a new pipe
*/
PyPipe assignTimestamps(bp::object fun);
/**
* @brief Creates a sliding window operator as the next operator on the pipe.
*
* Creates a sliding window operator of the given type and size.
*
* @param[in] wt
* the type of the window (row or range)
* @param[in] sz
* the window size (in number of tuples for row window or in milliseconds
* for range windows)
* @param[in] ei
* the eviction interval, i.e., time for triggering the eviction (in
* milliseconds)
* @return a new pipe
*/
PyPipe slidingWindow(WindowParams::WinType wt, unsigned int size, unsigned int interval = 0);
/**
* @brief Creates a print operator.
*
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment