Commit 8d285e08 authored by Philipp Götze's avatar Philipp Götze
Browse files

Integrated matrix support by Vadim Mishanin

parent d1a408c5
# - Try to find Eigen3 lib
#
# This module supports requiring a minimum version, e.g. you can do
# find_package(Eigen3 3.1.2)
# to require version 3.1.2 or newer of Eigen3.
#
# Once done this will define
#
# EIGEN3_FOUND - system has eigen lib with correct version
# EIGEN3_INCLUDE_DIR - the eigen include directory
# EIGEN3_VERSION - eigen version
#
# This module reads hints about search locations from
# the following enviroment variables:
#
# EIGEN3_ROOT
# EIGEN3_ROOT_DIR
# Copyright (c) 2006, 2007 Montel Laurent, <montel@kde.org>
# Copyright (c) 2008, 2009 Gael Guennebaud, <g.gael@free.fr>
# Copyright (c) 2009 Benoit Jacob <jacob.benoit.1@gmail.com>
# Redistribution and use is allowed according to the terms of the 2-clause BSD license.
if(NOT Eigen3_FIND_VERSION)
if(NOT Eigen3_FIND_VERSION_MAJOR)
set(Eigen3_FIND_VERSION_MAJOR 2)
endif(NOT Eigen3_FIND_VERSION_MAJOR)
if(NOT Eigen3_FIND_VERSION_MINOR)
set(Eigen3_FIND_VERSION_MINOR 91)
endif(NOT Eigen3_FIND_VERSION_MINOR)
if(NOT Eigen3_FIND_VERSION_PATCH)
set(Eigen3_FIND_VERSION_PATCH 0)
endif(NOT Eigen3_FIND_VERSION_PATCH)
set(Eigen3_FIND_VERSION "${Eigen3_FIND_VERSION_MAJOR}.${Eigen3_FIND_VERSION_MINOR}.${Eigen3_FIND_VERSION_PATCH}")
endif(NOT Eigen3_FIND_VERSION)
macro(_eigen3_check_version)
file(READ "${EIGEN3_INCLUDE_DIR}/Eigen/src/Core/util/Macros.h" _eigen3_version_header)
string(REGEX MATCH "define[ \t]+EIGEN_WORLD_VERSION[ \t]+([0-9]+)" _eigen3_world_version_match "${_eigen3_version_header}")
set(EIGEN3_WORLD_VERSION "${CMAKE_MATCH_1}")
string(REGEX MATCH "define[ \t]+EIGEN_MAJOR_VERSION[ \t]+([0-9]+)" _eigen3_major_version_match "${_eigen3_version_header}")
set(EIGEN3_MAJOR_VERSION "${CMAKE_MATCH_1}")
string(REGEX MATCH "define[ \t]+EIGEN_MINOR_VERSION[ \t]+([0-9]+)" _eigen3_minor_version_match "${_eigen3_version_header}")
set(EIGEN3_MINOR_VERSION "${CMAKE_MATCH_1}")
set(EIGEN3_VERSION ${EIGEN3_WORLD_VERSION}.${EIGEN3_MAJOR_VERSION}.${EIGEN3_MINOR_VERSION})
if(${EIGEN3_VERSION} VERSION_LESS ${Eigen3_FIND_VERSION})
set(EIGEN3_VERSION_OK FALSE)
else(${EIGEN3_VERSION} VERSION_LESS ${Eigen3_FIND_VERSION})
set(EIGEN3_VERSION_OK TRUE)
endif(${EIGEN3_VERSION} VERSION_LESS ${Eigen3_FIND_VERSION})
if(NOT EIGEN3_VERSION_OK)
message(STATUS "Eigen3 version ${EIGEN3_VERSION} found in ${EIGEN3_INCLUDE_DIR}, "
"but at least version ${Eigen3_FIND_VERSION} is required")
endif(NOT EIGEN3_VERSION_OK)
endmacro(_eigen3_check_version)
if (EIGEN3_INCLUDE_DIR)
# in cache already
_eigen3_check_version()
set(EIGEN3_FOUND ${EIGEN3_VERSION_OK})
else (EIGEN3_INCLUDE_DIR)
# search first if an Eigen3Config.cmake is available in the system,
# if successful this would set EIGEN3_INCLUDE_DIR and the rest of
# the script will work as usual
find_package(Eigen3 ${Eigen3_FIND_VERSION} NO_MODULE QUIET)
if(NOT EIGEN3_INCLUDE_DIR)
find_path(EIGEN3_INCLUDE_DIR NAMES signature_of_eigen3_matrix_library
HINTS
ENV EIGEN3_ROOT
ENV EIGEN3_ROOT_DIR
PATHS
${CMAKE_INSTALL_PREFIX}/include
${KDE4_INCLUDE_DIR}
PATH_SUFFIXES eigen3 eigen
)
endif(NOT EIGEN3_INCLUDE_DIR)
if(EIGEN3_INCLUDE_DIR)
_eigen3_check_version()
endif(EIGEN3_INCLUDE_DIR)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(Eigen3 DEFAULT_MSG EIGEN3_INCLUDE_DIR EIGEN3_VERSION_OK)
mark_as_advanced(EIGEN3_INCLUDE_DIR)
endif(EIGEN3_INCLUDE_DIR)
......@@ -8,23 +8,30 @@ set (PipeFabric_VERSION_MINOR 2)
include(CTest)
#############################
# customization section
#############################
#
################################################################################
# customization section #
################################################################################
# Installation path
set(PIPEFABRIC_DIR "/usr/local/pfabric")
#The following variables enable or disable additional functionalities, which can be switched off to reduce build time.
# The following variables enable or disable additional functionalities,
# which can be switched off to reduce build time.
# Support Matrix Operations (needs Eigen library to be installed)
option(SUPPORT_MATRICES
"support matrix operations as tuple and state type"
OFF)
#Build use cases
option(BUILD_USE_CASES "" OFF)
# Build use cases
option(BUILD_USE_CASES
""
OFF)
# Use the boost::spirit parser for converting strings to numbers
option(USE_BOOST_SPIRIT_PARSER
"use the boost::spirit::qi parsers for converting strings to tuple attributes"
ON
"use the boost::spirit::qi parsers for converting strings to tuple attributes"
ON
)
# Use RocksDB key-value store for implementing tables
......@@ -61,6 +68,15 @@ option(BUILD_BENCHMARKS
ON
)
################################
# End of customization section #
################################
# Use cases require matrix support (image and graph processing)
if(BUILD_USE_CASES)
set(SUPPORT_MATRICES)
endif()
# Benchmark test requires benchmark library
if (BUILD_BENCHMARKS)
set(BUILD_GOOGLE_BENCH ON)
......@@ -83,14 +99,10 @@ endif()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Wno-unused -Wno-uninitialized")
# End of customization section
#---------------------------------------------------------------------------
# Add our CMake directory to CMake's module path
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/../cmake/")
# We download some 3rdparty modules from github.com before building
# the project.
# We download some 3rdparty modules from github.com before building the project.
include(Download3rdParty)
if(NOT ${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
......@@ -101,9 +113,9 @@ else()
set(DYLIB_LIBRARY "")
endif()
#############################
# memory allocator libraries
#############################
##############################
# memory allocator libraries #
##############################
#
find_package(JeMalloc)
find_package(Tcmalloc)
......@@ -213,6 +225,27 @@ if (ZEROMQ_FOUND)
link_directories(${ZEROMQ_LIBRARY_DIR})
endif(ZEROMQ_FOUND)
#-----------------------------------------------------------------------------------------
#
# Matrix Support
#
if(SUPPORT_MATRICES)
####################################
# Eigen library for linear algebra #
####################################
find_package(Eigen3)
if(EIGEN3_FOUND)
message(STATUS "using Eigen3 library for linear algebra")
include_directories(${EIGEN3_INCLUDE_DIR})
else()
message(STATUS "Eigen3 not found")
endif()
add_definitions(-DSUPPORT_MATRICES)
endif()
#-----------------------------------------------------------------------------------------
#
# Building PipeFabric core library
......@@ -321,6 +354,16 @@ add_subdirectory(bench)
# Build use cases
#
if(BUILD_USE_CASES)
#######################################
# OpenCV library for image processing #
#######################################
find_package(OpenCV REQUIRED)
if(OpenCV_FOUND)
message(STATUS "using OpenCV library for image processing")
include_directories(${OpenCV_INCLUDE_DIR})
else()
message(STATUS "OpenCV not found")
endif()
add_subdirectory(usecases)
endif()
#-----------------------------------------------------------------------------------------
......
......@@ -39,6 +39,11 @@
namespace qi = boost::spirit::qi;
#endif
#ifdef SUPPORT_MATRICES
#include "core/StreamElementTraits.hpp"
#include "matrix/Matrix.hpp"
#include "matrix/VectorParser.hpp"
#endif
namespace pfabric {
......@@ -244,6 +249,58 @@ public:
}
};
#ifdef SUPPORT_MATRICES
/**
* @brief Atribute parser for sparse vector
**/
template<typename CellType>
class StringAttributeParser<pfabric::SparseVector<CellType> > :
public AttributeParserBase<pfabric::SparseVector<CellType>, StringAttributeParser<pfabric::SparseVector<CellType> > > {
public:
// the attribute should be initialized
typedef pfabric::SparseVector<CellType> Attribute;
/**
* @brief Parse reads vector of values from an attribute of tuple (e.g. TuplePtr< int, int, v1 v2 v3 ... >)
*
* @param[in] input
* string of values (v1 v2 v3 ...)
* @param[out] matrix
* the matrix which should be initialized by string of values from a tuple.
*/
static inline
void parse(const std::string &input, Attribute &vector) {
VectorParser::parse(input, vector);
}
};
/**
* @brief Atribute parser for dense vector
*/
template<typename CellType, int Rows, int Cols>
class StringAttributeParser<pfabric::DenseMatrix<CellType, Rows, Cols> > :
public AttributeParserBase<pfabric::DenseMatrix<CellType, Rows, Cols>, StringAttributeParser<pfabric::DenseMatrix<CellType, Rows, Cols>>> {
public:
// the attribute should be initialized
typedef pfabric::DenseMatrix<CellType, Rows, Cols> Attribute;
/**
* @brief Parse reads vector of values from an attribute of tuple (e.g. TuplePtr< int, int, v1 v2 v3 ... >)
*
* @param[in] input
* string of values (v1 v2 v3 ...)
* @param[out] matrix
* the matrix which should be initialized by string of values from a tuple.
*/
static inline
void parse(const std::string &input, Attribute &vector) {
VectorParser::parse(input, vector);
}
};
#endif
} /* end namespace pquery */
......
......@@ -28,6 +28,9 @@
#include "table/Table.hpp"
#include "qop/Queue.hpp"
#include "dsl/Topology.hpp"
#ifdef SUPPORT_MATRICES
#include "matrix/Matrix.hpp"
#endif
namespace pfabric {
......@@ -145,6 +148,28 @@ public:
TableInfoPtr getTableInfo(const std::string& tblName);
#ifdef SUPPORT_MATRICES
template<typename T>
std::shared_ptr<T> createMatrix(const std::string &matrixName) {
auto it = matrixMap.find(matrixName);
if(it != matrixMap.end()) {
throw std::logic_error("matrix already exists");
}
auto m = std::make_shared<T>();
matrixMap[matrixName] = m;
return m;
}
template<typename T>
std::shared_ptr<T> getMatrix(const std::string &matrixName) {
auto it = matrixMap.find(matrixName);
if (it != matrixMap.end()) {
return std::static_pointer_cast<T>(it->second);
}
throw std::logic_error("matrix not found");
}
#endif
/**
* @brief Creates a new stream with the given name and schema.
*
......@@ -174,7 +199,11 @@ private:
std::map<std::string, BaseTablePtr> mTableSet; //< a dictionary collecting all existing tables
std::map<std::string, Dataflow::BaseOpPtr> mStreamSet; //< a dictionary collecting all named streams
};
#ifdef SUPPORT_MATRICES
typedef std::shared_ptr<BaseMatrix> BaseMatrixPtr;
std::map<std::string, BaseMatrixPtr> matrixMap; //< a dictionary collecting all existing matrix
#endif
};
}
......
......@@ -58,6 +58,11 @@
#include "qop/Where.hpp"
#include "qop/ZMQSink.hpp"
#include "qop/ScaleJoin.hpp"
#ifdef SUPPORT_MATRICES
#include "qop/ToMatrix.hpp"
#include "qop/MatrixSlice.hpp"
#include "qop/MatrixMerge.hpp"
#endif
namespace pfabric {
......@@ -1446,8 +1451,74 @@ class Pipe {
throw TopologyException("No KeyExtractor defined for updateTable.");
}
}
/*------------------------------ partitioning
* -----------------------------*/
#ifdef SUPPORT_MATRICES
/**
* @brief Create a new pipe to insert tuples into matrix
* @tparam MatrixType
* the type of matrix (Sparse, Dense, etc.)
* @tparam T
* record containing values, typically TuplePtr< int, int, double >.
* @param[in]
* the matrix object to store values.
* @return
* the new pipe with operator to collect values into stateful the matrix.
*/
template<class MatrixType>
Pipe<T> toMatrix(std::shared_ptr<MatrixType> matrix) throw(TopologyException) {
try {
auto op = std::make_shared<ToMatrix<MatrixType>>(matrix);
auto iter = addPublisher<ToMatrix<MatrixType>, DataSource<T> >(op);
return Pipe<T>(dataflow, iter, keyExtractor, timestampExtractor, partitioningState, numPartitions);
} catch (...) {
throw TopologyException("toMatrix: unknown error has occured");
}
}
/**
* @brief matrix_slice
* the operator decouples a matrix into several parts
* sending them to the next operators separately
*
* @tparam PartitionFunc
* the user defined function to slice matrix
* @param[in] pred
* Predicate to decouple matrix
* @param[in] numParts
* the number of partitions
* @return a new pipe
*/
template<typename PartitionFunc>
Pipe<T> matrix_slice(PartitionFunc pred, std::size_t numParts) throw(TopologyException) {
try {
auto op = std::make_shared<MatrixSlice<T>>(pred, numParts);
auto iter = addPublisher<MatrixSlice<T>, DataSource<T> >(op);
return Pipe<T>(dataflow, iter, keyExtractor, timestampExtractor, partitioningState, numPartitions);
} catch (...) {
throw TopologyException("matrix_slice: unknown error has occured");
}
}
/**
* @brief matrix_merge
* the operator receives pieces of the matrix to put back together again
* @param[in] numParts
* the number of partitions
* @return a new pipe
*/
Pipe<T> matrix_merge(std::size_t numParts) throw(TopologyException) {
try {
auto op = std::make_shared<MatrixMerge<T>>(numParts);
auto iter = addPublisher<MatrixMerge<T>, DataSource<T> >(op);
return Pipe<T>(dataflow, iter, keyExtractor, timestampExtractor, partitioningState, numPartitions);
} catch (...) {
throw TopologyException("matrix_merge: unknown error has occured");
}
}
#endif
/*------------------------------ partitioning
* -----------------------------*/
/**
* @brief Create a PartitionBy operator.
*
......
......@@ -40,7 +40,9 @@
#include "qop/FromTable.hpp"
#include "qop/SelectFromTable.hpp"
#include "qop/StreamGenerator.hpp"
#ifdef SUPPORT_MATRICES
#include "qop/FromMatrix.hpp"
#endif
#include "dsl/Pipe.hpp"
#include "dsl/Dataflow.hpp"
......@@ -232,7 +234,26 @@ namespace pfabric {
return Pipe<T>(dataflow, dataflow->addPublisher(op));
}
#ifdef SUPPORT_MATRICES
/**
* @brief Create a pipe for stream from matrix
* @tparam Matrix
* matrix type
* @tparam Matrix::StreamElement
* record type of the matrix like @c TuplePtr< int, int, double >
* @param[in] matrix
* the matrix is source of stream.
* @return
* a new pipe with new stream.
*/
template<typename Matrix>
Pipe<typename Matrix::StreamElement> newStreamFromMatrix(std::shared_ptr<Matrix> matrix) {
auto op = std::make_shared<FromMatrix< Matrix > >(matrix);
return Pipe<typename Matrix::StreamElement>(dataflow, dataflow->addPublisher(op));
}
#endif
/**
* @brief Create a new pipe where a named stream is used as input.
*
* @tparam T the type of the stream element
......
#ifndef BASEMTRIX_HH__
#define BASEMTRIX_HH__
namespace pfabric
{
struct MatrixParams
{
enum ModificationMode {
Insert = 0 //< tuple was insert
, Update //< cell was updated
, Delete //< value was deleted
};
};
/*
* Base class for other matrices classes
*/
class BaseMatrix
{
protected:
BaseMatrix(){}
public:
virtual ~BaseMatrix(){}
template<typename M, typename Index>
static void
removeRow(M &matrix, Index row) {
M temp = matrix;
matrix.resize(matrix.rows()-1, matrix.cols());
auto bottomRows = (temp.rows()-row)-1;
matrix.topRows(row) = temp.topRows(row);
matrix.bottomRows(bottomRows) = temp.bottomRows(bottomRows);
}
template<typename M, typename Index>
static void
removeCol(M &matrix, Index col)
{
M temp=matrix;
matrix.resize(matrix.rows(), matrix.cols()-1);
auto rightColSize = (temp.cols()-col)-1;
matrix.leftCols(col) = temp.leftCols(col);
matrix.rightCols(rightColSize) = temp.rightCols(rightColSize);
}
};
template<typename T>
struct MatrixTraits
{
typedef typename T::element_type element_type;
typedef typename T::IndexType IndexType;
typedef std::pair<typename T::IndexType, typename T::IndexType > edge; //< indexes of a matrix
};
}
#endif //BASEMTRIX_HH__
\ No newline at end of file
#ifndef DENSEMATRIX_HH
#define DENSEMATRIX_HH
#include <sstream>
#include "BaseMatrix.hpp"
#include "ReaderValue.hpp"
#include <Eigen/Dense>
#include <boost/uuid/uuid.hpp>
namespace pfabric
{
template<typename CellType, int Rows, int Cols>
class DenseMatrix;
template<typename M>
struct DenseIterator : std::iterator<std::forward_iterator_tag, typename M::Scalar>
{
typedef DenseIterator<M> self_type;
typedef typename M::Index IndexType;
typedef const typename M::Scalar& reference;
DenseIterator()
: matrix(nullptr)
, row(0)
, col(0)
, rows(0)
, cols(0)
{}
DenseIterator(M *matrix, IndexType i, IndexType j, IndexType rows, IndexType cols)
: matrix(matrix)
, row(i)
, col(j)
, rows(rows)
, cols(cols)
{}
DenseIterator(const self_type &rhs)
: matrix(rhs.matrix)
, row(rhs.row)
, col(rhs.col)
, rows(rhs.rows)
, cols(rhs.cols)
{}
reference
operator*() {
return (*matrix)(row, col);
}
bool operator==(const self_type &rhs) const
{
return row == rhs.row && col == rhs.col;
}
bool operator!=(const self_type &rhs) const
{
return !operator==(rhs);
}
bool operator!() const
{
return row == rows && col == cols;
}
self_type& operator++() {
incIters();
return *this;
}
self_type operator++(int) {
auto tmp = *this;
incIters();
return tmp;
}
self_type& operator--() {
decIters();
return *this;
}
self_type operator--(int) {
auto tmp = *this;
decIters();
return tmp;
}
IndexType getRow() const { return row; }
IndexType getCol() const { return col; }