Commit f069941c authored by Kai-Uwe Sattler's avatar Kai-Uwe Sattler
Browse files

Initial checkin of Python integration

parent e78fba7f
......@@ -23,3 +23,4 @@ operators and utility classes. It consists of the following main components:
+ [Tutorial: How to build and use a Docker image](/documentation/Docker.md)
+ [PipeFabric Use Cases](/documentation/UseCases.md)
+ [Additional network sources](/documentation/Network.md)
+ [Python Integration](/documentation/Python.md)
### Python Integration ###
In addition to the C++ DSL, PipeFabric provides also a Python integration. This
allows to construct topologies (i.e. stream queries) in Python without the need
for writing and compiling C++ code. Due to the nature of Python, this is mainly
intended for prototyping - for applications requiring low latency processing the
native C++ interface should be chosen.
#### Using PipeFabric.Python ####
PipeFabric comes as a single dynamic library `libpfabric` which can be imported
in Python. Similar to the C++ DSL topologies can be created and used to construct
the dataflow graph by adding operators in the dot notation. Because the tuples
processed in such a topology are native Python tuples, lambda functions of the
different operators (e.g. `where`, `map` etc.) can be written directly in Python.
```Python
import libpfabric
t = libpfabric.Topology()
p = t.newStreamFromFile("data.csv") \
.extract(',') \
.map(lambda t, o: (int(t[0]), t[1], t[2])) \
.where(lambda x, o: x[0] > 1) \
.print()
```
Finally, to start the execution the `start` method of the topology object is invoked.
```Python
t.start()
```
......@@ -4,7 +4,7 @@ project (pipefabric)
set(CMAKE_MACOSX_RPATH 1)
set (PipeFabric_VERSION_MAJOR 0)
set (PipeFabric_VERSION_MINOR 2)
set (PipeFabric_VERSION_MINOR 3)
include(CTest)
......@@ -231,6 +231,7 @@ find_package(Boost ${BOOST_MIN_VERSION} REQUIRED COMPONENTS
serialization
thread
regex
python3
chrono
date_time
)
......@@ -262,6 +263,29 @@ set(BOOST_LIBRARIES
${DYLIB_LIBRARY}
)
######################
# Python
######################
#
find_package(PythonLibs)
FIND_PACKAGE(PythonInterp)
if (PYTHONLIBS_FOUND)
message(STATUS "Python Version ${PYTHON_VERSION_MAJOR} found - building the Python API.")
set (PYTHON_INCLUDE_DIRS "/Users/kai/miniconda3/include/python3.5m")
message("PythonInclude ${PYTHON_INCLUDE_DIRS}")
message("PythonLibs ${PYTHON_LIBDIR}")
include_directories(${PYTHON_INCLUDE_DIRS})
link_directories(${PYTHON_LIBDIR})
set(PYTHON_SOURCES
python/PyTopology.cpp
)
else()
set (PYTHON_SOURCES "")
endif(PYTHONLIBS_FOUND)
######################
# ZeroMQ library
######################
......@@ -449,6 +473,23 @@ target_link_libraries(pfabric_cep
${BOOST_LIBRARIES}
)
#-----------------------------------------------------------------------------------------
#
# Building PipeFabric Python library
#
add_library(pfabric SHARED
${PYTHON_SOURCES}
# we need this as dependency to download the sources from github
${THIRD_PARTY_DIR}/fmt
)
target_link_libraries(pfabric
${Boost_PYTHON3_LIBRARY}
${PYTHON_LIBRARIES}
pfabric_core
${BOOST_LIBRARIES}
)
#-----------------------------------------------------------------------------------------
#
# Experimental SQL query compiler
......
......@@ -75,8 +75,7 @@ enum PartitioningState {
/**
* @brief Pipe represents a sequence of operators applied to a data stream.
* Pipes are used
* mainly to construct a dataflow programatically.
* Pipes are used mainly to construct a dataflow programatically.
*
* A Pipe is used to construct and represent a dataflow program. Pipes are
* constructed by
......@@ -720,7 +719,7 @@ class Pipe {
* @tparam T
* the input tuple type (usually a TuplePtr) for the operator.
* @param[in] func
* a function pointer or lambda function implementing a predicate by
* a function pointer or lambda function implementing a predicate.
* returning a @c bool
* value for the input tuple
* @return a new pipe
......
/*
* Copyright (c) 2014-17 The PipeFabric team,
* All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License (GPL) as
* published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; see the file LICENSE.
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#include "PyTopology.hpp"
using namespace pfabric;
namespace bp = boost::python;
PyPipe PyPipe::extract(char sep) {
auto pipe = boost::get<StringPipe&>(pipeImpl);
return PyPipe(pipe.map<PyTuplePtr>([sep](auto tp, bool) -> PyTuplePtr {
bp::list seq;
auto s = get<0>(tp).begin();
while (*s) {
char* item = (char *)s;
while (*s && *s != sep) s++;
if ((s - item) == 0) {
// TODO null
}
else {
std::string str(item, s - item);
seq.append(str);
}
s++;
}
return makeTuplePtr(bp::object(bp::tuple(seq)));
}));
}
PyPipe PyPipe::where(bp::object pred) {
auto pipe = boost::get<TuplePipe&>(pipeImpl);
return PyPipe(pipe.where([pred](auto tp, bool o) {
return pred(get<0>(tp), o);
}));
}
PyPipe PyPipe::map(bp::object fun) {
auto pipe = boost::get<TuplePipe&>(pipeImpl);
return PyPipe(pipe.map<PyTuplePtr>([fun](auto tp, bool o) {
return makeTuplePtr(fun(get<0>(tp), o));
}));
}
PyPipe PyPipe::print() {
auto pipe = boost::get<TuplePipe&>(pipeImpl);
auto pyFormatter = [](std::ostream& os, auto tp) {
auto pyObj = get<0>(tp);
for (auto i = 0; i < len(pyObj); i++) {
if (i > 0) os << ", ";
os << bp::extract<std::string>(bp::str(pyObj[i]))();
}
os << std::endl;
};
return PyPipe(pipe.print(std::cout, pyFormatter));
}
PyTopology::PyTopology() {
topo = ctx.createTopology();
}
PyPipe PyTopology::newStreamFromFile(std::string file) {
return PyPipe(topo->newStreamFromFile(file));
}
void PyTopology::start() {
topo->start(false);
}
BOOST_PYTHON_MODULE(libpfabric) {
bp::class_<pfabric::PyTopology>("Topology")
.def("newStreamFromFile", &pfabric::PyTopology::newStreamFromFile)
.def("start", &pfabric::PyTopology::start)
;
bp::class_<pfabric::PyPipe>("Pipe", bp::no_init)
.def("extract", &pfabric::PyPipe::extract)
.def("where", &pfabric::PyPipe::where)
.def("map", &pfabric::PyPipe::map)
.def("print", &pfabric::PyPipe::print)
;
}
/*
* Copyright (c) 2014-17 The PipeFabric team,
* All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License (GPL) as
* published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; see the file LICENSE.
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#ifndef PyTopology_hpp_
#define PyTopology_hpp_
#include "pfabric.hpp"
#include <boost/python.hpp>
#include <boost/variant.hpp>
namespace bp = boost::python;
namespace pfabric {
/// We handle only tuples consisting of a single field that represents
/// a Python tuple.
typedef TuplePtr<bp::object> PyTuplePtr;
/**
* @brief PyPipe represents a sequence of operators applied to a data stream.
*
* PyPipe objects are used to construct a dataflow programatically in Python.
* Actually, PyPipe is just a wrapper around the @c Pipe class.
*/
struct PyPipe {
typedef Pipe<TStringPtr> StringPipe;
typedef Pipe<PyTuplePtr> TuplePipe;
///
boost::variant<StringPipe, TuplePipe> pipeImpl;
/**
* @brief Construct a new PyPipe object from Pipe<TStringPtr>.
*/
PyPipe(StringPipe p) : pipeImpl(p) {}
/**
* @brief Construct a new PyPipe object from Pipe<PyTuplePtr>.
*/
PyPipe(TuplePipe p) : pipeImpl(p) {}
/**
* @brief Creates an data extraction operator.
*
* Creates an operator for extracting fields from a simple string
* tuple as the next operator on the pipe. The result is a Python
* tuple consisting of string fields.
*
* @param[in] sep
* @return a new PyPipe object
*
*/
PyPipe extract(char sep);
/**
* @brief Creates a filter operator for selecting tuples.
*
* Creates a filter operator which forwards only tuples satisfying the
* given filter predicate written in Python as the next operator on the pipe.
*
* @param[in] pred a function pointer or lambda function implementing a predicate.
* @return a new PyPipe object
*/
PyPipe where(bp::object pred);
/**
* @brief Creates a projection operator.
*
* Creates a map operator which applies a mapping (projection) function
* written in Python to each tuples as the next operator on the pipe.
*
* @param[in] fun a function pointer or lambda function producinh a new tuple
* from the input tuple
* @return a new PyPipe object
*/
PyPipe map(bp::object fun);
/**
* @brief Creates a print operator.
*
* Creates an operator for printing tuples to the console
* as the next operator on the pipe.
*
* @return a new PyPipe object
*/
PyPipe print();
};
/**
* @brief PyTopology represents a dataflow graph of operators.
*
* PyTopology is the main entry point for constructing a stream processing
* query in Python. It is used to create pipes with data sources as publishers
* which can be used to connect other stream operators. PyTopology is just
* a wrapper around PipeFabric's Topology class for Python.
*
* @code
* import libpfabric
*
* t = libpfabric.Topology()
* p = t.newStreamFromFile("data.csv") \
* .extract(',') \
* .map(lambda t, o: (int(t[0]), t[1], t[2])) \
* .where(lambda x, o: x[0] > 1) \
* .print()
*
* t.start()
* @endcode
*/
struct PyTopology {
/**
* @brief Creates a new @c PyTopology object.
*/
PyTopology();
/**
* @brief Creates a pipe from a text file source as input.
*
* Creates a new pipe for reading tuples (containing only a
* string field representing a line of the file) via a
* TextFileSource operator.
*
* @param[in] fname
* the name of the file from which the tuples are read.
* @return a new PyPipe object
*/
PyPipe newStreamFromFile(std::string file);
/**
* @brief Starts processing of the whole topology.
*/
void start();
/// the PipeFabric context needed for creating a topology
PFabricContext ctx;
/// the corresponding topology object
PFabricContext::TopologyPtr topo;
};
}
#endif
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment