Commit 3cb184da authored by Philipp Götze's avatar Philipp Götze
Browse files

Merge branch 'master' into wip/nvml

parents 8f9d50ce 767e201b
......@@ -21,3 +21,5 @@ operators and utility classes. It consists of the following main components:
+ [Tutorial: Embedding PipeFabric](/documentation/Embedding.md)
+ [Tutorial: Stream partitioning](/documentation/Partitioning.md)
+ [Tutorial: How to build and use a Docker image](/documentation/Docker.md)
+ [PipeFabric Use Cases](/documentation/UseCases.md)
+ [Additional network sources](/documentation/Network.md)
......@@ -117,6 +117,20 @@ add_custom_command(
)
endif()
#--------------------------------------------------------------------------------
if(BUILD_USE_CASES)
# data for use cases
download_project(PROJ data
GIT_REPOSITORY https://github.com/dbis-ilm/data.git
GIT_TAG master
UPDATE_DISCONNECTED 1
QUIET
)
file(COPY ${PROJECT_BINARY_DIR}/data-src/DEBS2017
DESTINATION ${THIRD_PARTY_DIR}
)
endif()
#--------------------------------------------------------------------------------
if(USE_NVML_TABLE)
# Non-Volatile Memory Library (pmem.io)
......@@ -129,7 +143,7 @@ download_project(PROJ nvml
add_custom_command(
OUTPUT ${THIRD_PARTY_DIR}/nvml
COMMAND ${CMAKE_COMMAND} -E chdir ${nvml_SOURCE_DIR} $(MAKE)
COMMAND ${CMAKE_COMMAND} -E chdir ${nvml_SOURCE_DIR} $(MAKE) install prefix=${THIRD_PARTY_DIR}/nvml
COMMAND ${CMAKE_COMMAND} -E chdir ${nvml_SOURCE_DIR} $(MAKE) install prefix=${THIRD_PARTY_DIR}/nvml
)
# PTable (internal gitlab project) for NVM
......@@ -142,6 +156,6 @@ download_project(PROJ ptable
add_custom_command(
OUTPUT ${THIRD_PARTY_DIR}/ptable
COMMAND ${CMAKE_COMMAND} -E chdir ${ptable_SOURCE_DIR} cmake -DPTABLE_DIR=${THIRD_PARTY_DIR}/ptable src
COMMAND ${CMAKE_COMMAND} -E chdir ${ptable_SOURCE_DIR} $(MAKE) install
COMMAND ${CMAKE_COMMAND} -E chdir ${ptable_SOURCE_DIR} $(MAKE) install
)
endif()
# - Try to find Eigen3 lib
#
# This module supports requiring a minimum version, e.g. you can do
# find_package(Eigen3 3.1.2)
# to require version 3.1.2 or newer of Eigen3.
#
# Once done this will define
#
# EIGEN3_FOUND - system has eigen lib with correct version
# EIGEN3_INCLUDE_DIR - the eigen include directory
# EIGEN3_VERSION - eigen version
#
# This module reads hints about search locations from
# the following enviroment variables:
#
# EIGEN3_ROOT
# EIGEN3_ROOT_DIR
# Copyright (c) 2006, 2007 Montel Laurent, <montel@kde.org>
# Copyright (c) 2008, 2009 Gael Guennebaud, <g.gael@free.fr>
# Copyright (c) 2009 Benoit Jacob <jacob.benoit.1@gmail.com>
# Redistribution and use is allowed according to the terms of the 2-clause BSD license.
if(NOT Eigen3_FIND_VERSION)
if(NOT Eigen3_FIND_VERSION_MAJOR)
set(Eigen3_FIND_VERSION_MAJOR 2)
endif(NOT Eigen3_FIND_VERSION_MAJOR)
if(NOT Eigen3_FIND_VERSION_MINOR)
set(Eigen3_FIND_VERSION_MINOR 91)
endif(NOT Eigen3_FIND_VERSION_MINOR)
if(NOT Eigen3_FIND_VERSION_PATCH)
set(Eigen3_FIND_VERSION_PATCH 0)
endif(NOT Eigen3_FIND_VERSION_PATCH)
set(Eigen3_FIND_VERSION "${Eigen3_FIND_VERSION_MAJOR}.${Eigen3_FIND_VERSION_MINOR}.${Eigen3_FIND_VERSION_PATCH}")
endif(NOT Eigen3_FIND_VERSION)
macro(_eigen3_check_version)
file(READ "${EIGEN3_INCLUDE_DIR}/Eigen/src/Core/util/Macros.h" _eigen3_version_header)
string(REGEX MATCH "define[ \t]+EIGEN_WORLD_VERSION[ \t]+([0-9]+)" _eigen3_world_version_match "${_eigen3_version_header}")
set(EIGEN3_WORLD_VERSION "${CMAKE_MATCH_1}")
string(REGEX MATCH "define[ \t]+EIGEN_MAJOR_VERSION[ \t]+([0-9]+)" _eigen3_major_version_match "${_eigen3_version_header}")
set(EIGEN3_MAJOR_VERSION "${CMAKE_MATCH_1}")
string(REGEX MATCH "define[ \t]+EIGEN_MINOR_VERSION[ \t]+([0-9]+)" _eigen3_minor_version_match "${_eigen3_version_header}")
set(EIGEN3_MINOR_VERSION "${CMAKE_MATCH_1}")
set(EIGEN3_VERSION ${EIGEN3_WORLD_VERSION}.${EIGEN3_MAJOR_VERSION}.${EIGEN3_MINOR_VERSION})
if(${EIGEN3_VERSION} VERSION_LESS ${Eigen3_FIND_VERSION})
set(EIGEN3_VERSION_OK FALSE)
else(${EIGEN3_VERSION} VERSION_LESS ${Eigen3_FIND_VERSION})
set(EIGEN3_VERSION_OK TRUE)
endif(${EIGEN3_VERSION} VERSION_LESS ${Eigen3_FIND_VERSION})
if(NOT EIGEN3_VERSION_OK)
message(STATUS "Eigen3 version ${EIGEN3_VERSION} found in ${EIGEN3_INCLUDE_DIR}, "
"but at least version ${Eigen3_FIND_VERSION} is required")
endif(NOT EIGEN3_VERSION_OK)
endmacro(_eigen3_check_version)
if (EIGEN3_INCLUDE_DIR)
# in cache already
_eigen3_check_version()
set(EIGEN3_FOUND ${EIGEN3_VERSION_OK})
else (EIGEN3_INCLUDE_DIR)
# search first if an Eigen3Config.cmake is available in the system,
# if successful this would set EIGEN3_INCLUDE_DIR and the rest of
# the script will work as usual
find_package(Eigen3 ${Eigen3_FIND_VERSION} NO_MODULE QUIET)
if(NOT EIGEN3_INCLUDE_DIR)
find_path(EIGEN3_INCLUDE_DIR NAMES signature_of_eigen3_matrix_library
HINTS
ENV EIGEN3_ROOT
ENV EIGEN3_ROOT_DIR
PATHS
${CMAKE_INSTALL_PREFIX}/include
${KDE4_INCLUDE_DIR}
PATH_SUFFIXES eigen3 eigen
)
endif(NOT EIGEN3_INCLUDE_DIR)
if(EIGEN3_INCLUDE_DIR)
_eigen3_check_version()
endif(EIGEN3_INCLUDE_DIR)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(Eigen3 DEFAULT_MSG EIGEN3_INCLUDE_DIR EIGEN3_VERSION_OK)
mark_as_advanced(EIGEN3_INCLUDE_DIR)
endif(EIGEN3_INCLUDE_DIR)
## Additional network sources ##
There already exist ZeroMQ and REST as network sources, providing tuples via network connection.
In addition, the AMQP (Advanced Message Queuing Protocol) used by RabbitMQ as well as the Apache
Kafka and the MQTT (Message Queue Telemetry Transport) protocol can be used as source. However,
there are additional libraries/installs necessary to run the protocols which are not delivered in
PipeFabric per default.
## RabbitMQ ##
### Preliminaries and Installation ###
For AMQP (RabbitMQ):
+ [RabbitMQ Server](https://www.rabbitmq.com/download.html)
+ [Amqpcpp library](https://github.com/akalend/amqpcpp), available on GitHub
The server is necessary for realization of AMQP, while the Amqpcpp library allows using the server
within the C++ language. On Linux side, the server is usually inside of the standard repositories,
so you can easily install it with the command `sudo apt-get install rabbitmq-server`.
After the RabbitMQ server and the Amqpcpp library are installed and added to your path (in a way
that CMake can find them), you have to enable the RabbitMQ source for PipeFabric by switching the
CMake variable `USE_RABBITMQ` to `ON`. This can be done manually in the CMakeLists.txt file in the
src folder or by passing `-DUSE_RABBITMQ=ON` to cmake, like `cmake -DUSE_RABBITMQ=ON ../src`.
In addition, you have to start the RabbitMQ server before running the test case. This can be done
on console by the command `service rabbitmq-server start`. Else the test case will throw an error,
namely `AMQP cannot create socket`.
### Usage ###
PipeFabric provides an interface to the RabbitMQ server in which all currently available messages
on the server are gathered, transformed to tuples and forwarded to the query. This is done by using
the operator `newStreamFromRabbitMQ`:
`Pipe<TStringPtr> newStreamFromRabbitMQ(const std::string& info, const std::string& queueName)`
Each incoming message of the RabbitMQ service produces a single tuple (consisting of a single
string). The parameter `info` declares the connection of the server. Usually when the RabbitMQ
server is started without modifications, a user named "guest" with the password "guest" is applied
to the system. The `info` parameter can then be entered as `guest:guest@localhost:5672`, where 5672
is the used port. The parameter `queueName` describes the queue, where messages are exchanged.
The operator currently checks once if there are messages (tuples) available on the RabbitMQ server.
If yes, all the messages are gathered and sent downstreams to the subscribers (that means, the
following operator(s)). Then it finishes. However, it can be easily adapted to stay waiting,
repeatedly asking the server if new messages have arrived.
There is an example (test case) provided how to use it properly which can be found in
`/test/RabbitMQSourceTest.cpp` of the source folder.
## Apache Kafka ##
### Preliminaries and Installation ###
For Apache Kafka:
+ [Apache Zookeeper](https://zookeeper.apache.org/)
+ [Apache Kafka server](https://kafka.apache.org/downloads)
+ [Librdkafka](https://github.com/edenhill/librdkafka), C++ Kafka library, available on GitHub
+ [Cppkafka](https://github.com/mfontanini/cppkafka), C++ wrapper around Librdkafka, available on GitHub
The Kafka server is used for exchanging messages and uses Apache Zookeeper as dependency. On most
Linux systems, the Zookeeper is available in the standard repository, so it is possible to use the
command `sudo apt-get install zookeeperd` for installing. For setting up the Kafka server on Linux
inside your home directory, you can simply do the following on the command line (using wget):
```
$ mkdir -p ~/kafka
$ cd ~/kafka
$ wget http://www-us.apache.org/dist/kafka/0.10.2.0/kafka_2.10-0.10.2.0.tgz
$ tar xvzf kafka_2.10-0.10.2.0.tgz --strip 1
$ ./bin/kafka-server-start.sh ~/kafka/config/server.properties
```
For deleting topics in Apache Kafka, you should edit the server properties located in
`~/kafka/config/server.properties`, removing the `#` in the line `#delete.topic.enable=true`.
The library `librdkafka` provides support for C++ when using Apache Kafka. The wrapper `cppkafka`
uses the library, providing a much more userfriendly utilization. Both have to be installed in a
way that CMake can find them (libraries and headers).
Apache Kafka is then enabled in PipeFabric by switching the CMake variable `USE_KAFKA` to `ON`.
This can be done manually in the CMakeLists.txt file in the src folder or by passing
`-DUSE_KAFKA=ON` to cmake, like `cmake -DUSE_KAFKA=ON ../src`.
In addition, you have to start the Kafka server before running the test case. This can be done
on console inside the Kafka folder by the command
`./bin/kafka-server-start.sh ./config/server.properties`. Else the test case will throw an error,
namely `Connection refused - brokers are down`.
### Usage ###
PipeFabric provides an interface to the Kafka server in which all currently available messages
on the server are gathered, transformed to tuples and forwarded to the query. This is done by using
the operator `newStreamFromKafka`:
`Pipe<TStringPtr> Topology::newStreamFromKafka(const std::string& broker, const std::string& topic, const std::string& groupID)`
Each incoming message of the Kafka server produces a single tuple (consisting of a single string).
The parameter `broker` describes a cluster instance on the server, possible to use your localhost.
The `topic` is the topic on which the data is exchanged, respectively tuples. Finally, the
`groupID` of the consumer describes to which group (of producers and consumers) it belongs to.
Kafka automatically destroys groups that have no members left.
The operator currently checks once if there are messages (tuples) available on the Kafka server.
If yes, all the messages are consecutively gathered and sent downstreams to the subscribers (that
means, the following operator(s)). Then it finishes. However, it can be easily adapted to stay
waiting, repeatedly asking the server if new messages have arrived.
There is an example (test case) provided how to use it properly which can be found in
`/test/KafkaSourceTest.cpp` of the source folder.
## MQTT ##
### Preliminaries and Installation ###
For MQTT:
+ [Eclipse Mosquitto](https://mosquitto.org/)
+ [Eclipse Paho C](https://github.com/eclipse/paho.mqtt.c.git), available on Github
+ [Eclipse Paho C++](https://github.com/eclipse/paho.mqtt.cpp.git), available on GitHub
The Eclipse Mosquitto delivers the necessities for running an MQTT server. In Linux systems, it is
possible to use the command `sudo apt-get install mosquitto mosquitto-clients` for the server and
the client software. Eclipse Paho provides the libraries and headers for C++ support. Because of
Eclipse Paho C++ references the Eclipse Paho C installation, both are necessary. Both have to be
installed in a way that CMake can find them (libraries and headers).
MQTT is then enabled in PipeFabric by switching the CMake variable `USE_MQTT` to `ON`. This can be
done manually in the CMakeLists.txt file in the src folder or by passing `-DUSE_MQTT=ON` to cmake,
like `cmake -DUSE_MQTT=ON ../src`.
In addition, you have to start the MQTT server before running the test case. This can be done on
console by the command `mosquitto`. Else the test case will throw an error, namely
`MQTT error [-1]: TCP/TLS connect failure`.
### Usage ###
PipeFabric provides an interface to the MQTT server in which all currently available messages on
the server are gathered, transformed to tuples and forwarded to the query. This is done by using
the operator `newStreamFromMQTT`:
`Pipe<TStringPtr> Topology::newStreamFromMQTT(const std::string& conn, const std::string& channel)`
Each incoming message of the MQTT server produces a single tuple (consisting of a single string).
The parameter `conn` describes the server address with port. The channel is just the name of the
topic where the messages are exchanged.
The operator currently checks once if there are messages (tuples) available on the MQTT server.
If yes, all the messages are consecutively gathered and sent downstreams to the subscribers (that
means, the following operator(s)). Then it finishes. However, it can be easily adapted to stay
waiting, repeatedly asking the server if new messages have arrived.
There is an example (test case) provided how to use it properly which can be found in
`/test/MQTTSourceTest.cpp` of the source folder.
## Use Cases for PipeFabric ##
Here we want to describe and show some use cases for PipeFabric to give you an insight about the possibilities
and necessary efforts of utilization. Therefore we provide the following topics:
+ DEBS2017, a research challenge for detecting anomalies in RDF streaming data
+ Image and Graph Processing, realized by matrix operations
+ Processing of Movement Trajectories, tracking movement of different objects (coming soon)
To run them on your own, we give the necessary simple installation steps below because the use cases are not
built by default in PipeFabric. Besides this, an additional summary of the most important facts about the topics
above are given.
### Installation ###
First, you can build PipeFabric as described in the [Installation description](documentation/Installation.md),
running the test cases if everything works. To enable the building of use cases, you have to switch a CMake
variable called `BUILD_USE_CASES` to `ON`. You can directly edit the `CMakeLists.txt` file or pass the command
`-DBUILD_USE_CASES=ON` when building (e.g. `cmake -DBUILD_USE_CASES=ON ../src;`). Remember to delete the
`CMakeCache.txt` first, or the change in the variable will not be recognized!
After the build is finished, you can run the use cases independently.
+ DEBS2017: In your build folder, just run "./usecases/DEBS2017/debs2017" on your command line.
### DEBS2017 ###
#### Challenge description ####
The [DEBS2017 Grand Challenge](https://project-hobbit.eu/challenges/debs-grand-challenge/) is a yearly reoccuring
series, facing different problems of processing data streams. The objective of the challenge in 2017 was to
process and analyze RDF streaming data given by digital and analogue sensors of manufacturing equipment. Anomalies
measured by the sensors should be detected as final result.
As limiting conditions, the following constraints are given:
The data of the sensors is first properly clustered. For the clusters, state transitions between them over time
are observed (modeled as a Markov chain). Anomalies are then detectable as different sequences of transitions
that are not very common, expressed by a probability lower than a certain threshold.
Besides the difficulties of realizing and updating such a Markov chain without missing some anomalies later on
the classification step, the challenge lies in providing a high throughput and low latency processing. The data
from sensors could be real time streaming, therefore the anomaly detection has to be fast for reacting in short
time to the individual anomaly as well as being efficient enough to allow parallel processing of multiple sensor
inputs.
#### Solution with PipeFabric ####
There are four datasets available for testing purposes. They have just differences in their size as well as in
their amount of anomalies. We deliver the second dataset for the use case (which has 10 MB in size), but the
solution works also for bigger datasets.
First, the given metadata of the challenge is read from file and stored in appropriate data structures. After that
the stream of RDF data is started, ready for processing. The stored information in RDF (like timestamps or the
machine identifier) is extracted and forwarded as well-structured tuple. An additional window operator is
responsible for only regarding tuples up to a certain age.
The following clustering along with updating existing clusters is realized next in a single customized operator.
Therefore a clustering state is stored and updated accordingly when the next tuple arrives. After the clustering,
the Markov chain (responsible for finding anomalies) receives the cluster data, implemented also as a single
customized operator. If the transition probability is under the given threshold, an anomaly is found and returned
by a simple print on the console.
For data of real sensors with real manufacturing machines, this print could also be connected to a warning or
whatever the reaction to an anomaly should be.
### Image and Graph Processing ###
Requires OpenCV and Eigen library to be installed (tested with opencv 3.3.1 and eigen 3.3.1)
### Movement Trajectories ###
(coming soon!)
......@@ -8,29 +8,54 @@ set (PipeFabric_VERSION_MINOR 2)
include(CTest)
#############################
# customization section
#############################
#
################################################################################
# customization section #
################################################################################
# Installation path
set(PIPEFABRIC_DIR "/usr/local/pfabric")
# Set to 1 if you need log output
add_definitions(-DDO_LOG=0)
# The following variables enable or disable additional functionalities,
# which can be switched off to reduce build time.
# Support Matrix Operations (needs Eigen library to be installed)
option(SUPPORT_MATRICES
"support matrix operations as tuple and state type"
OFF
)
# Build use cases
option(BUILD_USE_CASES
"build use cases to show functionality examples"
OFF
)
#The following variables enable or disable additional functionalities, which can be switched off to reduce build time.
# Use RabbitMQ as network source
option(USE_RABBITMQ
"use RabbitMQ as network source"
OFF
)
# Use Apache Kafka as network source
option(USE_KAFKA
"use Apache Kafka as network source"
OFF
)
# Use MQTT as network source
option(USE_MQTT
"use MQTT as network source"
OFF
)
# Use the boost::spirit parser for converting strings to numbers
option(USE_BOOST_SPIRIT_PARSER
"use the boost::spirit::qi parsers for converting strings to tuple attributes"
ON
"use the boost::spirit::qi parsers for converting strings to tuple attributes"
ON
)
# Use RocksDB key-value store for implementing tables
# If switched to off, it will not be downloaded, saving initial building time.
option(USE_ROCKSDB_TABLE
"use RocksDB for implementing persistent tables"
"use RocksDB for implementing persistent tables"
OFF
)
......@@ -58,15 +83,24 @@ option(BUILD_TEST_CASES
#Build google benchmark library
option(BUILD_GOOGLE_BENCH
"build google benchmark"
OFF
ON
)
# Build benchmark test
option(BUILD_BENCHMARKS
"build benchmarks for pipefabric"
OFF
ON
)
################################
# End of customization section #
################################
# Use cases require matrix support (image and graph processing)
if(BUILD_USE_CASES)
set(SUPPORT_MATRICES ON)
endif()
# Benchmark test requires benchmark library
if (BUILD_BENCHMARKS)
set(BUILD_GOOGLE_BENCH ON)
......@@ -89,14 +123,10 @@ endif()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Wno-unused -Wno-uninitialized")
# End of customization section
#---------------------------------------------------------------------------
# Add our CMake directory to CMake's module path
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/../cmake/")
# We download some 3rdparty modules from github.com before building
# the project.
# We download some 3rdparty modules from github.com before building the project.
include(Download3rdParty)
if(NOT ${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
......@@ -107,9 +137,9 @@ else()
set(DYLIB_LIBRARY "")
endif()
#############################
# memory allocator libraries
#############################
##############################
# memory allocator libraries #
##############################
#
find_package(JeMalloc)
find_package(Tcmalloc)
......@@ -185,7 +215,6 @@ else ()
set (NVML_LIBRARIES "")
endif()
######################
# Boost C++ library
######################
......@@ -245,6 +274,27 @@ if (ZEROMQ_FOUND)
link_directories(${ZEROMQ_LIBRARY_DIR})
endif(ZEROMQ_FOUND)
#-----------------------------------------------------------------------------------------
#
# Matrix Support
#
if(SUPPORT_MATRICES)
####################################
# Eigen library for linear algebra #
####################################
find_package(Eigen3)
if(EIGEN3_FOUND)
message(STATUS "using Eigen3 library for linear algebra")
include_directories(${EIGEN3_INCLUDE_DIR})
else()
message(STATUS "Eigen3 not found")
endif()
add_definitions(-DSUPPORT_MATRICES)
endif()
#-----------------------------------------------------------------------------------------
#
# Building PipeFabric core library
......@@ -283,6 +333,13 @@ set(core_sources
${THIRD_PARTY_DIR}/catch
)
if(USE_ROCKSDB_TABLE)
set(core_sources
${core_sources}
${THIRD_PARTY_DIR}/rocksdb
)
endif()
if(BUILD_GOOGLE_BENCH)
set(core_sources
${core_sources}
......@@ -290,33 +347,94 @@ if(BUILD_GOOGLE_BENCH)
)
endif()
if(USE_ROCKSDB_TABLE)
set(core_libs
${BOOST_LIBRARIES}
${ZEROMQ_LIBRARIES}
)
#-----------------------------------------------------------------------------------------
#
##########
# RabbitMQ
##########
#
#
if(USE_RABBITMQ)
add_definitions(-DUSE_RABBITMQ)
set(core_libs
${core_libs}
amqpcpp
rabbitmq
)
set(core_sources
${core_sources}
net/RabbitMQSource.cpp
)
endif()
#-----------------------------------------------------------------------------------------
#
##############
# Apache Kafka
##############
#
#
if(USE_KAFKA)
add_definitions(-DUSE_KAFKA)
set(core_libs
${core_libs}
cppkafka
rdkafka
)
set(core_sources
${core_sources}
net/KafkaSource.cpp
)
endif()
#-----------------------------------------------------------------------------------------
#
######
# MQTT
######
#
#
if(USE_MQTT)
add_definitions(-DUSE_MQTT)
set(core_libs
${core_libs}
paho-mqtt3c
paho-mqtt3cs
paho-mqtt3a
paho-mqtt3as
paho-mqttpp3
)
set(core_sources
${core_sources}
${THIRD_PARTY_DIR}/rocksdb)
add_library(pfabric_core SHARED
${core_sources})
target_link_libraries(pfabric_core
${BOOST_LIBRARIES}
${ZEROMQ_LIBRARIES}
${ROCKSDB_LIB})
net/MQTTSource.cpp
)
endif()
if(USE_NVML_TABLE)
add_definitions(-DDO_LOG=0)
set(core_sources
${core_sources}
${THIRD_PARTY_DIR}/nvml
${THIRD_PARTY_DIR}/ptable
)
add_library(pfabric_core SHARED
${core_sources}
set(core_libs
${core_libs}
${NVML_LIBRARIES}
)
target_link_libraries(pfabric_core
${BOOST_LIBRARIES}
${ZEROMQ_LIBRARIES}
${NVML_LIBRARIES})
endif()