Commit ce9fc092 authored by Kai-Uwe Sattler's avatar Kai-Uwe Sattler
Browse files

Use case: frequent trajectories added

parents 0f0733ee 5bd22964
......@@ -21,3 +21,4 @@ operators and utility classes. It consists of the following main components:
+ [Tutorial: Embedding PipeFabric](/documentation/Embedding.md)
+ [Tutorial: Stream partitioning](/documentation/Partitioning.md)
+ [Tutorial: How to build and use a Docker image](/documentation/Docker.md)
+ [PipeFabric Use Cases](/documentation/UseCases.md)
......@@ -48,6 +48,12 @@ add_custom_command(
COMMAND ${CMAKE_COMMAND} -E copy
${Format_SOURCE_DIR}/fmt/format.h
${THIRD_PARTY_DIR}/fmt
COMMAND ${CMAKE_COMMAND} -E copy
${Format_SOURCE_DIR}/fmt/ostream.h
${THIRD_PARTY_DIR}/fmt
COMMAND ${CMAKE_COMMAND} -E copy
${Format_SOURCE_DIR}/fmt/ostream.cc
${THIRD_PARTY_DIR}/fmt
COMMAND ${CMAKE_COMMAND} -E copy
${Format_SOURCE_DIR}/fmt/format.cc
${THIRD_PARTY_DIR}/fmt)
......@@ -117,3 +123,17 @@ add_custom_command(
)
endif()
#--------------------------------------------------------------------------------
if(BUILD_USE_CASES)
# data for use cases
download_project(PROJ data
GIT_REPOSITORY https://github.com/dbis-ilm/data.git
GIT_TAG master
UPDATE_DISCONNECTED 1
QUIET
)
file(COPY ${PROJECT_BINARY_DIR}/data-src/DEBS2017
DESTINATION ${THIRD_PARTY_DIR}
)
endif()
# - Try to find Eigen3 lib
#
# This module supports requiring a minimum version, e.g. you can do
# find_package(Eigen3 3.1.2)
# to require version 3.1.2 or newer of Eigen3.
#
# Once done this will define
#
# EIGEN3_FOUND - system has eigen lib with correct version
# EIGEN3_INCLUDE_DIR - the eigen include directory
# EIGEN3_VERSION - eigen version
#
# This module reads hints about search locations from
# the following enviroment variables:
#
# EIGEN3_ROOT
# EIGEN3_ROOT_DIR
# Copyright (c) 2006, 2007 Montel Laurent, <montel@kde.org>
# Copyright (c) 2008, 2009 Gael Guennebaud, <g.gael@free.fr>
# Copyright (c) 2009 Benoit Jacob <jacob.benoit.1@gmail.com>
# Redistribution and use is allowed according to the terms of the 2-clause BSD license.
if(NOT Eigen3_FIND_VERSION)
if(NOT Eigen3_FIND_VERSION_MAJOR)
set(Eigen3_FIND_VERSION_MAJOR 2)
endif(NOT Eigen3_FIND_VERSION_MAJOR)
if(NOT Eigen3_FIND_VERSION_MINOR)
set(Eigen3_FIND_VERSION_MINOR 91)
endif(NOT Eigen3_FIND_VERSION_MINOR)
if(NOT Eigen3_FIND_VERSION_PATCH)
set(Eigen3_FIND_VERSION_PATCH 0)
endif(NOT Eigen3_FIND_VERSION_PATCH)
set(Eigen3_FIND_VERSION "${Eigen3_FIND_VERSION_MAJOR}.${Eigen3_FIND_VERSION_MINOR}.${Eigen3_FIND_VERSION_PATCH}")
endif(NOT Eigen3_FIND_VERSION)
macro(_eigen3_check_version)
file(READ "${EIGEN3_INCLUDE_DIR}/Eigen/src/Core/util/Macros.h" _eigen3_version_header)
string(REGEX MATCH "define[ \t]+EIGEN_WORLD_VERSION[ \t]+([0-9]+)" _eigen3_world_version_match "${_eigen3_version_header}")
set(EIGEN3_WORLD_VERSION "${CMAKE_MATCH_1}")
string(REGEX MATCH "define[ \t]+EIGEN_MAJOR_VERSION[ \t]+([0-9]+)" _eigen3_major_version_match "${_eigen3_version_header}")
set(EIGEN3_MAJOR_VERSION "${CMAKE_MATCH_1}")
string(REGEX MATCH "define[ \t]+EIGEN_MINOR_VERSION[ \t]+([0-9]+)" _eigen3_minor_version_match "${_eigen3_version_header}")
set(EIGEN3_MINOR_VERSION "${CMAKE_MATCH_1}")
set(EIGEN3_VERSION ${EIGEN3_WORLD_VERSION}.${EIGEN3_MAJOR_VERSION}.${EIGEN3_MINOR_VERSION})
if(${EIGEN3_VERSION} VERSION_LESS ${Eigen3_FIND_VERSION})
set(EIGEN3_VERSION_OK FALSE)
else(${EIGEN3_VERSION} VERSION_LESS ${Eigen3_FIND_VERSION})
set(EIGEN3_VERSION_OK TRUE)
endif(${EIGEN3_VERSION} VERSION_LESS ${Eigen3_FIND_VERSION})
if(NOT EIGEN3_VERSION_OK)
message(STATUS "Eigen3 version ${EIGEN3_VERSION} found in ${EIGEN3_INCLUDE_DIR}, "
"but at least version ${Eigen3_FIND_VERSION} is required")
endif(NOT EIGEN3_VERSION_OK)
endmacro(_eigen3_check_version)
if (EIGEN3_INCLUDE_DIR)
# in cache already
_eigen3_check_version()
set(EIGEN3_FOUND ${EIGEN3_VERSION_OK})
else (EIGEN3_INCLUDE_DIR)
# search first if an Eigen3Config.cmake is available in the system,
# if successful this would set EIGEN3_INCLUDE_DIR and the rest of
# the script will work as usual
find_package(Eigen3 ${Eigen3_FIND_VERSION} NO_MODULE QUIET)
if(NOT EIGEN3_INCLUDE_DIR)
find_path(EIGEN3_INCLUDE_DIR NAMES signature_of_eigen3_matrix_library
HINTS
ENV EIGEN3_ROOT
ENV EIGEN3_ROOT_DIR
PATHS
${CMAKE_INSTALL_PREFIX}/include
${KDE4_INCLUDE_DIR}
PATH_SUFFIXES eigen3 eigen
)
endif(NOT EIGEN3_INCLUDE_DIR)
if(EIGEN3_INCLUDE_DIR)
_eigen3_check_version()
endif(EIGEN3_INCLUDE_DIR)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(Eigen3 DEFAULT_MSG EIGEN3_INCLUDE_DIR EIGEN3_VERSION_OK)
mark_as_advanced(EIGEN3_INCLUDE_DIR)
endif(EIGEN3_INCLUDE_DIR)
## Use Cases for PipeFabric ##
Here we want to describe and show some use cases for PipeFabric to give you an insight about the possibilities
and necessary efforts of utilization. Therefore we provide the following topics:
+ DEBS2017, a research challenge for detecting anomalies in RDF streaming data
+ Image and Graph Processing, realized by matrix operations
+ Processing of Movement Trajectories, tracking movement of different objects (coming soon)
To run them on your own, we give the necessary simple installation steps below because the use cases are not
built by default in PipeFabric. Besides this, an additional summary of the most important facts about the topics
above are given.
### Installation ###
First, you can build PipeFabric as described in the [Installation description](documentation/Installation.md),
running the test cases if everything works. To enable the building of use cases, you have to switch a CMake
variable called `BUILD_USE_CASES` to `ON`. You can directly edit the `CMakeLists.txt` file or pass the command
`-DBUILD_USE_CASES=ON` when building (e.g. `cmake -DBUILD_USE_CASES=ON ../src;`). Remember to delete the
`CMakeCache.txt` first, or the change in the variable will not be recognized!
After the build is finished, you can run the use cases independently.
+ DEBS2017: In your build folder, just run "./usecases/DEBS2017/debs2017" on your command line.
### DEBS2017 ###
#### Challenge description ####
The [DEBS2017 Grand Challenge](https://project-hobbit.eu/challenges/debs-grand-challenge/) is a yearly reoccuring
series, facing different problems of processing data streams. The objective of the challenge in 2017 was to
process and analyze RDF streaming data given by digital and analogue sensors of manufacturing equipment. Anomalies
measured by the sensors should be detected as final result.
As limiting conditions, the following constraints are given:
The data of the sensors is first properly clustered. For the clusters, state transitions between them over time
are observed (modeled as a Markov chain). Anomalies are then detectable as different sequences of transitions
that are not very common, expressed by a probability lower than a certain threshold.
Besides the difficulties of realizing and updating such a Markov chain without missing some anomalies later on
the classification step, the challenge lies in providing a high throughput and low latency processing. The data
from sensors could be real time streaming, therefore the anomaly detection has to be fast for reacting in short
time to the individual anomaly as well as being efficient enough to allow parallel processing of multiple sensor
inputs.
#### Solution with PipeFabric ####
There are four datasets available for testing purposes. They have just differences in their size as well as in
their amount of anomalies. We deliver the second dataset for the use case (which has 10 MB in size), but the
solution works also for bigger datasets.
First, the given metadata of the challenge is read from file and stored in appropriate data structures. After that
the stream of RDF data is started, ready for processing. The stored information in RDF (like timestamps or the
machine identifier) is extracted and forwarded as well-structured tuple. An additional window operator is
responsible for only regarding tuples up to a certain age.
The following clustering along with updating existing clusters is realized next in a single customized operator.
Therefore a clustering state is stored and updated accordingly when the next tuple arrives. After the clustering,
the Markov chain (responsible for finding anomalies) receives the cluster data, implemented also as a single
customized operator. If the transition probability is under the given threshold, an anomaly is found and returned
by a simple print on the console.
For data of real sensors with real manufacturing machines, this print could also be connected to a warning or
whatever the reaction to an anomaly should be.
### Image and Graph Processing ###
Requires OpenCV and Eigen library to be installed (tested with opencv 3.3.1 and eigen 3.3.1)
### Movement Trajectories ###
(coming soon!)
......@@ -8,20 +8,30 @@ set (PipeFabric_VERSION_MINOR 2)
include(CTest)
#############################
# customization section
#############################
#
################################################################################
# customization section #
################################################################################
# Installation path
set(PIPEFABRIC_DIR "/usr/local/pfabric")
#The following variables enable or disable additional functionalities, which can be switched off to reduce build time.
# The following variables enable or disable additional functionalities,
# which can be switched off to reduce build time.
# Support Matrix Operations (needs Eigen library to be installed)
option(SUPPORT_MATRICES
"support matrix operations as tuple and state type"
OFF)
# Build use cases
option(BUILD_USE_CASES
""
ON)
# Use the boost::spirit parser for converting strings to numbers
option(USE_BOOST_SPIRIT_PARSER
"use the boost::spirit::qi parsers for converting strings to tuple attributes"
ON
"use the boost::spirit::qi parsers for converting strings to tuple attributes"
ON
)
# Use RocksDB key-value store for implementing tables
......@@ -43,13 +53,13 @@ option(BUILD_ONLY_LIBS
# If switched to off, no tests will be build
option(BUILD_TEST_CASES
"build tests for pipefabric functionality"
ON
ON
)
#Build google benchmark library
option(BUILD_GOOGLE_BENCH
"build google benchmark"
ON
ON
)
# Build benchmark test
......@@ -58,6 +68,15 @@ option(BUILD_BENCHMARKS
ON
)
################################
# End of customization section #
################################
# Use cases require matrix support (image and graph processing)
if(BUILD_USE_CASES)
set(SUPPORT_MATRICES ON)
endif()
# Benchmark test requires benchmark library
if (BUILD_BENCHMARKS)
set(BUILD_GOOGLE_BENCH ON)
......@@ -80,14 +99,10 @@ endif()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Wno-unused -Wno-uninitialized")
# End of customization section
#---------------------------------------------------------------------------
# Add our CMake directory to CMake's module path
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/../cmake/")
# We download some 3rdparty modules from github.com before building
# the project.
# We download some 3rdparty modules from github.com before building the project.
include(Download3rdParty)
if(NOT ${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
......@@ -98,9 +113,9 @@ else()
set(DYLIB_LIBRARY "")
endif()
#############################
# memory allocator libraries
#############################
##############################
# memory allocator libraries #
##############################
#
find_package(JeMalloc)
find_package(Tcmalloc)
......@@ -210,6 +225,27 @@ if (ZEROMQ_FOUND)
link_directories(${ZEROMQ_LIBRARY_DIR})
endif(ZEROMQ_FOUND)
#-----------------------------------------------------------------------------------------
#
# Matrix Support
#
if(SUPPORT_MATRICES)
####################################
# Eigen library for linear algebra #
####################################
find_package(Eigen3)
if(EIGEN3_FOUND)
message(STATUS "using Eigen3 library for linear algebra")
include_directories(${EIGEN3_INCLUDE_DIR})
else()
message(STATUS "Eigen3 not found")
endif()
add_definitions(-DSUPPORT_MATRICES)
endif()
#-----------------------------------------------------------------------------------------
#
# Building PipeFabric core library
......@@ -312,6 +348,24 @@ add_subdirectory(test)
# Micro-benchmarking using Google Benchmark
#
add_subdirectory(bench)
#-----------------------------------------------------------------------------------------
#
# Build use cases
#
if(BUILD_USE_CASES)
#######################################
# OpenCV library for image processing #
#######################################
find_package(OpenCV REQUIRED)
if(OpenCV_FOUND)
message(STATUS "using OpenCV library for image processing")
include_directories(${OpenCV_INCLUDE_DIR})
else()
message(STATUS "OpenCV not found")
endif()
endif()
add_subdirectory(usecases)
#-----------------------------------------------------------------------------------------
#
# Installation
......
......@@ -251,8 +251,7 @@ void TopologyPartitionedJoinTest(benchmark::State& state) {
.join<int>(s1, [](auto tp1, auto tp2) { return true; })
.merge();
t.start();
t.wait();
t.start(false);
//BAD: Takes far too long because of iteration number
//wait for results - stop timer
//state.PauseTiming();
......
......@@ -39,6 +39,11 @@
namespace qi = boost::spirit::qi;
#endif
#ifdef SUPPORT_MATRICES
#include "core/StreamElementTraits.hpp"
#include "matrix/Matrix.hpp"
#include "matrix/VectorParser.hpp"
#endif
namespace pfabric {
......@@ -244,6 +249,58 @@ public:
}
};
#ifdef SUPPORT_MATRICES
/**
* @brief Atribute parser for sparse vector
**/
template<typename CellType>
class StringAttributeParser<pfabric::SparseVector<CellType> > :
public AttributeParserBase<pfabric::SparseVector<CellType>, StringAttributeParser<pfabric::SparseVector<CellType> > > {
public:
// the attribute should be initialized
typedef pfabric::SparseVector<CellType> Attribute;
/**
* @brief Parse reads vector of values from an attribute of tuple (e.g. TuplePtr< int, int, v1 v2 v3 ... >)
*
* @param[in] input
* string of values (v1 v2 v3 ...)
* @param[out] matrix
* the matrix which should be initialized by string of values from a tuple.
*/
static inline
void parse(const std::string &input, Attribute &vector) {
VectorParser::parse(input, vector);
}
};
/**
* @brief Atribute parser for dense vector
*/
template<typename CellType, int Rows, int Cols>
class StringAttributeParser<pfabric::DenseMatrix<CellType, Rows, Cols> > :
public AttributeParserBase<pfabric::DenseMatrix<CellType, Rows, Cols>, StringAttributeParser<pfabric::DenseMatrix<CellType, Rows, Cols>>> {
public:
// the attribute should be initialized
typedef pfabric::DenseMatrix<CellType, Rows, Cols> Attribute;
/**
* @brief Parse reads vector of values from an attribute of tuple (e.g. TuplePtr< int, int, v1 v2 v3 ... >)
*
* @param[in] input
* string of values (v1 v2 v3 ...)
* @param[out] matrix
* the matrix which should be initialized by string of values from a tuple.
*/
static inline
void parse(const std::string &input, Attribute &vector) {
VectorParser::parse(input, vector);
}
};
#endif
} /* end namespace pquery */
......
......@@ -10,23 +10,4 @@ if (NOT BUILD_ONLY_LIBS)
${Boost_FILESYSTEM_LIBRARY}
${Boost_THREAD_LIBRARY}
)
add_executable(TrackerServer TrackerServer.cpp
TrajectoryDB.cpp
GeoUtils.cpp
QueryLoop.cpp
WebServer.cpp)
target_link_libraries(TrackerServer
pfabric_qcomp
pfabric_core
${ROCKSDB_LIB}
${Boost_PROGRAM_OPTIONS_LIBRARY}
${Boost_DATE_TIME_LIBRARY}
${Boost_FILESYSTEM_LIBRARY}
${Boost_SYSTEM_LIBRARY}
${Boost_LOG_LIBRARY}
${Boost_FILESYSTEM_LIBRARY}
${Boost_THREAD_LIBRARY}
)
endif()
......@@ -28,6 +28,9 @@
#include "table/Table.hpp"
#include "qop/Queue.hpp"
#include "dsl/Topology.hpp"
#ifdef SUPPORT_MATRICES
#include "matrix/Matrix.hpp"
#endif
namespace pfabric {
......@@ -144,6 +147,28 @@ public:
TableInfoPtr getTableInfo(const std::string& tblName);
#ifdef SUPPORT_MATRICES
template<typename T>
std::shared_ptr<T> createMatrix(const std::string &matrixName) {
auto it = matrixMap.find(matrixName);
if(it != matrixMap.end()) {
throw std::logic_error("matrix already exists");
}
auto m = std::make_shared<T>();
matrixMap[matrixName] = m;
return m;
}
template<typename T>
std::shared_ptr<T> getMatrix(const std::string &matrixName) {
auto it = matrixMap.find(matrixName);
if (it != matrixMap.end()) {
return std::static_pointer_cast<T>(it->second);
}
throw std::logic_error("matrix not found");
}
#endif
/**
* @brief Creates a new stream with the given name and schema.
*
......@@ -173,7 +198,11 @@ private:
std::map<std::string, BaseTablePtr> mTableSet; //< a dictionary collecting all existing tables
std::map<std::string, Dataflow::BaseOpPtr> mStreamSet; //< a dictionary collecting all named streams
};
#ifdef SUPPORT_MATRICES
typedef std::shared_ptr<BaseMatrix> BaseMatrixPtr;
std::map<std::string, BaseMatrixPtr> matrixMap; //< a dictionary collecting all existing matrix
#endif
};
}
......
......@@ -57,6 +57,12 @@
#include "qop/Tuplifier.hpp"
#include "qop/Where.hpp"
#include "qop/ZMQSink.hpp"
#include "qop/ScaleJoin.hpp"
#ifdef SUPPORT_MATRICES
#include "qop/ToMatrix.hpp"
#include "qop/MatrixSlice.hpp"
#include "qop/MatrixMerge.hpp"
#endif
namespace pfabric {
......@@ -657,6 +663,17 @@ class Pipe {
return Pipe<BatchPtr<T>>(dataflow, iter, keyExtractor, timestampExtractor,
partitioningState, numPartitions);
}
/*
* TODO: This doesn't work. Find a better way!
* Pipe<T::element_type> unbatch() throw(TopologyException) {
* auto op = std::make_shared<UnBatcher<T::element_type>>();
* auto iter = addPublisher<UnBatcher<T::element_type>, DataSource<T::element_type>>(op);
* return Pipe<T::element_type>(dataflow, iter, keyExtractor, timestampExtractor,
* partitioningState, numPartitions);
* }
*/
/**
* @brief
*
......@@ -1257,6 +1274,101 @@ class Pipe {
}
}
/**
* @brief Creates an operator for joining two streams represented by pipes.
* Origin idea & paper: "ScaleJoin: a Deterministic, Disjoint-Parallel and
* Skew-Resilient Stream Join" (2016)
*
* Creates an operator implementing a ScaleJoin to join two streams.
* In addition a join predicate can be specified. Note, that the output
* tuple type is derived from the two input types.
*
* @tparam T
* the input tuple type (usually a TuplePtr) of the left stream.
* @tparam T2
* the input tuple type (usually a TuplePtr) of the right stream.
* @tparam KeyType
* the data type for representing keys (join values)
* @param[in] otherPipe
* the pipe representing the right stream
* @param[in] pred
* the join predicate
* @param[in] threadnum
* the number of threads for parallel joining
* @return a new pipe
*/
template <typename KeyType = DefaultKeyType, typename T2>
Pipe<typename ScaleJoin<T, T2, KeyType>::ResultElement> scaleJoin(
Pipe<T2>& otherPipe, typename ScaleJoin<T, T2, KeyType>::JoinPredicateFunc pred, const int threadnum)
throw(TopologyException) {
typedef typename ScaleJoin<T, T2, KeyType>::ResultElement Tout;
try {
typedef std::function<KeyType(const T&)> LKeyExtractorFunc;
typedef std::function<KeyType(const T2&)> RKeyExtractorFunc;
//specify the keys of tuples
LKeyExtractorFunc fn1 = boost::any_cast<LKeyExtractorFunc>(keyExtractor);
RKeyExtractorFunc fn2 = boost::any_cast<RKeyExtractorFunc>(otherPipe.keyExtractor);
//get the sources of tuples of last operator before scaleJoin-operator (left and right stream)
auto pOp = castOperator<DataSource<T> >(getPublisher());
auto otherOp = castOperator<DataSource<T2> >(otherPipe.getPublisher());
//partitioning not necessary, already multithreaded join
assert(partitioningState == NoPartitioning);
assert(otherPipe.partitioningState == NoPartitioning);
assert(threadnum > 0);
//vector for join operators as well as queues (multithreading encoupling)
std::vector<std::shared_ptr<ScaleJoin<T, T2, KeyType> > > scJoinVec;
std::vector<std::shared_ptr<Queue<T> > > scQueueVec;
//queue for collecting join results, forwarding as a single stream
auto combine = std::make_shared<Queue<Tout>>();
//start thread instances, specified by threadnum
for (auto i=0; i<threadnum; i++) {
//create queue and scaleJoin instances
auto qu = std::make_shared<Queue<T>>();
auto scJoin = std::make_shared<ScaleJoin<T, T2, KeyType> >(fn1, fn2, pred, i, threadnum);
//connect output of predecessing operator of left stream with input of the current queue instance
CREATE_LINK(pOp, qu);
//connect output of queue instance with input of scaleJoin instance
connectChannels(qu->getOutputDataChannel(), scJoin->getLeftInputDataChannel());
connectChannels(qu->getOutputPunctuationChannel(), scJoin->getInputPunctuationChannel());
//connect output of predecessing operator of right stream with input of scaleJoin instance
connectChannels(otherOp->getOutputDataChannel(), scJoin->getRightInputDataChannel());
connectChannels(otherOp->getOutputPunctuationChannel(), scJoin->getInputPunctuationChannel());
//connect output of current scaleJoin instance with the combining queue operator
CREATE_LINK(scJoin, combine);
//add queue and scaleJoin instance to the vectors
scQueueVec.push_back(qu);