Commit d9042fdb authored by Philipp Götze's avatar Philipp Götze
Browse files

Merge branch 'master' into wip/nvml

Commented out table dependent test cases
parents 2beb9320 001305c8
......@@ -16,6 +16,7 @@ operators and utility classes. It consists of the following main components:
+ [Installation](documentation/Installation.md)
+ [Getting started: Using PipeFabric](/documentation/Usage.md)
+ [List of operators](/documentation/Operators.md)
+ [Table support](/documentation/Tables.md)
+ [Tutorial: How to write a query](/documentation/Tutorial.md)
+ [Tutorial: Embedding PipeFabric](/documentation/Embedding.md)
+ [Tutorial: Stream partitioning](/documentation/Partitioning.md)
......
......@@ -71,6 +71,7 @@ add_custom_command(
${THIRD_PARTY_DIR}/SimpleWeb)
#--------------------------------------------------------------------------------
if (BUILD_GOOGLE_BENCH)
# Google Benchmark framework
download_project(PROJ benchmark
GIT_REPOSITORY https://github.com/google/benchmark.git
......@@ -78,29 +79,46 @@ download_project(PROJ benchmark
UPDATE_DISCONNECTED 1
QUIET
)
add_custom_command(
OUTPUT ${THIRD_PARTY_DIR}/benchmark
COMMAND ${CMAKE_COMMAND} -E chdir ${benchmark_SOURCE_DIR} cmake -DCMAKE_BUILD_TYPE=Release
COMMAND ${CMAKE_COMMAND} -E chdir ${benchmark_SOURCE_DIR} $(MAKE)
COMMAND ${CMAKE_COMMAND} -E make_directory ${THIRD_PARTY_DIR}/benchmark/include
COMMAND ${CMAKE_COMMAND} -E make_directory ${THIRD_PARTY_DIR}/benchmark/lib
COMMAND ${CMAKE_COMMAND} -E copy_directory
${benchmark_SOURCE_DIR}/include
${THIRD_PARTY_DIR}/benchmark/include
COMMAND ${CMAKE_COMMAND} -E copy
${benchmark_SOURCE_DIR}/src/libbenchmark.a
${THIRD_PARTY_DIR}/benchmark/lib
)
endif()
#--------------------------------------------------------------------------------
if(USE_ROCKSDB_TABLE)
# RocksDB key-value store
download_project(PROJ rocksdb
GIT_REPOSITORY https://github.com/facebook/rocksdb
GIT_TAG master
UPDATE_DISCONNECTED 1
QUIET
GIT_REPOSITORY https://github.com/facebook/rocksdb
GIT_TAG v5.1.4
UPDATE_DISCONNECTED 1
QUIET
)
add_custom_command(
OUTPUT ${THIRD_PARTY_DIR}/rocksdb
COMMAND ${CMAKE_COMMAND} -E chdir ${rocksdb_SOURCE_DIR} $(MAKE) static_lib
COMMAND ${CMAKE_COMMAND} -E make_directory ${THIRD_PARTY_DIR}/rocksdb/include
COMMAND ${CMAKE_COMMAND} -E make_directory ${THIRD_PARTY_DIR}/rocksdb/lib
COMMAND ${CMAKE_COMMAND} -E copy_directory
${rocksdb_SOURCE_DIR}/include
${THIRD_PARTY_DIR}/rocksdb/include
COMMAND ${CMAKE_COMMAND} -E copy
${rocksdb_SOURCE_DIR}/librocksdb.a
${THIRD_PARTY_DIR}/rocksdb/lib
OUTPUT ${THIRD_PARTY_DIR}/rocksdb
COMMAND ${CMAKE_COMMAND} -E chdir ${rocksdb_SOURCE_DIR} $(MAKE) static_lib
COMMAND ${CMAKE_COMMAND} -E make_directory ${THIRD_PARTY_DIR}/rocksdb/include
COMMAND ${CMAKE_COMMAND} -E make_directory ${THIRD_PARTY_DIR}/rocksdb/lib
COMMAND ${CMAKE_COMMAND} -E copy_directory
${rocksdb_SOURCE_DIR}/include
${THIRD_PARTY_DIR}/rocksdb/include
COMMAND ${CMAKE_COMMAND} -E copy
${rocksdb_SOURCE_DIR}/librocksdb.a
${THIRD_PARTY_DIR}/rocksdb/lib
)
endif()
#--------------------------------------------------------------------------------
if(USE_NVML_TABLE)
# Non-Volatile Memory Library (pmem.io)
download_project(PROJ nvml
GIT_REPOSITORY https://github.com/pmem/nvml.git
......@@ -117,3 +135,4 @@ add_custom_command(
# ${nvml_SOURCE_DIR}/src/include
# ${THIRD_PARTY_DIR}/nvml/include
)
endif()
......@@ -11,6 +11,7 @@ macro( build_executable arg )
${Boost_SYSTEM_LIBRARY}
${ROCKSDB_LIB}
${NVML_LIBRARIES}
${BENCHMARK_LIB}
${MALLOC_LIB}
)
endmacro( build_executable )
......
......@@ -11,7 +11,7 @@ incoming tuple. Thus, the example above can be modified:
```C++
auto s = t->newStreamFromREST(8099, "^/publish$", RESTSource::POST_METHOD)
...
.notify<ResultTuplePtr>([&](auto tp, bool outdated) {
.notify([&](auto tp, bool outdated) {
std::cout << tp << std::endl;
});
......
......@@ -8,8 +8,9 @@ PipeFabric relies on several open source components which have to be installed b
+ [ZeroMQ](http://zeromq.org/) socket library (including [zmq.hpp](https://github.com/zeromq/cppzmq/blob/master/zmq.hpp))
+ JeMalloc or TCMalloc library (optional)
There are some additional 3rd party libraries such as [Catch](https://github.com/philsquared/Catch) for testing, [SimpleWeb](https://github.com/eidheim/Simple-Web-Server), [Format](https://github.com/fmtlib/fmt), and [JSON](https://github.com/nlohmann/json) but
they are either included or downloaded during the build.
There are some additional 3rd party libraries such as [Catch](https://github.com/philsquared/Catch) for testing,
[SimpleWeb](https://github.com/eidheim/Simple-Web-Server), [Format](https://github.com/fmtlib/fmt), [RocksDB](https://github.com/facebook/rocksdb),
and [JSON](https://github.com/nlohmann/json) but they are either included or downloaded during the build.
After cloning the repository, compile everything:
......
This diff is collapsed.
......@@ -12,9 +12,9 @@ are finally merged into a single stream again.
```C++
auto s = t->newStreamFromFile("data.csv")
.extract<Tin>(',')
.partitionBy<Tin>([](auto tp) { return getAttribute<0>(tp) % 5; }, 5)
.where<Tin>([](auto tp, bool outdated) { return getAttribute<0>(tp) % 2 == 0; } )
.map<Tin, Tout>([](auto tp) { return makeTuplePtr(getAttribute<0>(tp)); } )
.merge<Tout>()
..print<Tout>(std::cout);
.partitionBy([](auto tp) { return get<0>(tp) % 5; }, 5)
.where([](auto tp, bool outdated) { return get<0>(tp) % 2 == 0; } )
.map<Tout>([](auto tp) { return makeTuplePtr(get<0>(tp)); } )
.merge()
.print(std::cout);
```
......@@ -29,13 +29,13 @@ using namespace pfabric;
BUILDER_CLASS(Query_2)
typedef TuplePtr<Tuple<int, double>> Tuple_1_Type_;
typedef TuplePtr<int, double> Tuple_1_Type_;
PFabricContext::TopologyPtr Query_2::create(PFabricContext& ctx) {
auto SENSOR_DATA = ctx.getTable<Tuple_1_Type_::element_type, int>("SENSOR_DATA");
topology = ctx.createTopology();
topology->selectFromTable<Tuple_1_Type_, int>(SENSOR_DATA)
.print<Tuple_1_Type_>();
.print();
return topology;
}
......
### Persistent Tables ###
Though, PipeFabric is a framework for data stream processing, it supports also the concept of
tables to store data persistently. Tables can be used either as source of data streams (i.e. updates),
as persistent sink of data streams or to manage the state of operators in a persistent way.
Currently, two different table implementations are supported (both with the same interface):
+ hashmap-based in-memory table,
+ disk-based table using RocksDB for managing data.
The table implementation is chosen at compile time by setting the option `USE_ROCKSDB_TABLE` in
`src/CMakeLists.txt`.
#### Creating a Table ####
Tables have to be created explicitly before use by specifying the schema in the form of a
`Tuple` type passed a template argument to the `createTable` method. In the following example,
a table `TEST_TBL` with three columns is created:
```C++
typedef Tuple<int, std::string, double> RecordType;
PFabricContext ctx;
auto myTable = ctx.createTable<RecordType, int>("TEST_TBL");
```
Note, that a key has to be specified for each table and the data type of the key (in the example above
`int`) is passed as second argument to the template instantiation. Furthermore, in contrast to type
arguments for the streaming operators, the `createTable` method requires a `Tuple` type but not a `TuplePtr`.
The key is always part of the tuple type.
There is an alternative way of creating a new table that allows to specify a complete schema consisting
of column types and names using a `TableInfo` object. This approach is needed if ad-hoc queries on tables
have to be supported.
```C++
...
TableInfo tblInfo("TEST_TBL",
{ ColumnInfo("col1", ColumnInfo::Int_Type),
ColumnInfo("col2", ColumnInfo::String_Type),
ColumnInfo("col3", ColumnInfo::Double_Type) },
auto myTable = ctx.createTable<RecordType, int>(tblInfo);
```
#### Accessing a Table ####
Tables in PipeFabric support different ways of storing, updating, and retrieving data:
+ via the Table API, i.e. insert, deleteByKey, deleteWhere, updateByKey, ...
+ via the streaming operator DSL, i.e. toTable, newStreamFromTable, updateTable, ...
+ via the ad-hoc query facility.
The Table API provides basic functions to insert, update, delete, and retrieve table records. In the following list
`KeyType` refers to the data type of the key, `RecordType` to the tuple type representing the schema.
+ `insert(KeyType k, const RecordType& rec)` inserts a new record with the given key into the table.
+ `deleteByKey(KeyType k)` deletes the record with the given key.
+ `deleteWhere(std::function<bool(const RecordType&)> predicate)` deletes all records satisfying the given predicate.
+ `updateByKey(KeyType k, std::function<void(RecordType&)> updater)` updates the tuple with the given key by applying
the function `updater` which accepts the tuple as parameter and modified it in a certain way.
+ `updateWhere(std::function<bool(const RecordType&)> predicate, std::function<void(RecordType&)> updater)` updates all tuples from
the table satisfying the given predicate by applying the function `updater` which modifies the tuple.
+ `getByKey(KeyType k)` returns a pointer to the tuple with the given key in the form of `TuplePtr<RecordType>`. If no tuple
exists for this key, then an exception is raised.
+ `select(std::function<bool(const RecordType&)> predicate)`
Streaming operators are used as part of constructing a topology. Table-related operators are
(see [Operators](Operators.md) for a detailed description):
+ `newStreamFromTable` constructs a new data stream from updates on the given table. Whenever a
tuple in the table is updated, a new stream element is constructed and published.
+ `selectFromTable` is used to perform a standard (batch) query on a table by selecting all
tuples satisfying an optionally specified predicate. Though, this constructs also a data stream,
the stream ends when the last tuple in the table is reached.
+ `toTable` writes the elements of a data stream to the given table, either inserting new tuple,
updating existing tuples (identified by the key) or deleting tuples from the table (in case
of outdated tuples).
+ `updateTable` allows to use stream elements to update a table. It differs from `toTable` by allowing
to perform arbitrary updates via a user-provided update function. In contrast, `toTable` stores tuples
of the stream directly, i.e. the schema of the table and the stream have to be the same.
......@@ -15,10 +15,10 @@ Next, we define the schema: the tuple types for representing input and output da
```C++
// the structure of tuples we receive via REST
typedef TuplePtr<Tuple<int, double> > InTuplePtr;
typedef TuplePtr<int, double> InTuplePtr;
// the structure of our output (aggregate) tuples
typedef TuplePtr<Tuple<double> > ResultTuplePtr;
typedef TuplePtr<double> ResultTuplePtr;
```
And for the aggregation we have to define a type the captures the aggregation state.
......@@ -51,9 +51,9 @@ int main(int argc, char **argv) {
auto s = t->newStreamFromREST(8099, "^/publish$", RESTSource::POST_METHOD)
.extractJson<InTuplePtr>({"key", "data"})
.slidingWindow<InTuplePtr>(WindowParams::RowWindow, 10)
.aggregate<InTuplePtr, ResultTuplePtr, MyAggrState> ()
.print<ResultTuplePtr>(std::cout);
.slidingWindow(WindowParams::RowWindow, 10)
.aggregate<MyAggrState> ()
.print(std::cout);
t->start();
t->wait();
......
......@@ -4,11 +4,12 @@ The main data structure for representing elements of a data stream is the `Tuple
represents a template class which can parametrized with the attribute types of the element. Note,
that timestamps are not represented separately: timestamps can be derived from any attribute (or
a combination of attributes). Furthermore, tuples are not copied around but only passed by reference.
For this purpose, the `TuplePtr<>` template is used. Thus, a complete schema definition for a stream
For this purpose, usually the `TuplePtr<>` template is used instead of `Tuple` which simply wraps
a tuple with an intrusive pointer. Thus, a complete schema definition for a stream
looks like the following:
```C++
typedef TuplePtr<Tuple<int, std::string, double> > T1;
typedef TuplePtr<int, std::string, double> T1;
```
Tuples can be constructed using the `makeTuplePtr` function which of course requires correctly
......@@ -31,17 +32,17 @@ which allows to specify processing steps in a DSL very similar to Apache Spark.
code snippet gives an example. See below for an explanation of the provided operators.
```C++
typedef TuplePtr<Tuple<int, std::string, double> > T1;
typedef TuplePtr<Tuple<double, int> > T2;
typedef TuplePtr<int, std::string, double> T1;
typedef TuplePtr<double, int> T2;
Topology t;
auto s = t.newStreamFromFile("file.csv")
.extract<T1>(',')
.where<T1>([](auto tp, bool outdated) { return get<0>(tp) % 2 == 0; } )
.map<T1,T2>([](auto tp) -> T2 {
.where([](auto tp, bool outdated) { return get<0>(tp) % 2 == 0; } )
.map<T2>([](auto tp) -> T2 {
return makeTuplePtr(get<2>(tp), get<0>(tp));
})
.print<T2>(std::cout);
.print(std::cout);
t.start();
```
cmake_minimum_required(VERSION 3.0)
cmake_minimum_required(VERSION 3.2)
project (pipefabric)
set(CMAKE_MACOSX_RPATH 1)
......@@ -8,22 +8,6 @@ set (PipeFabric_VERSION_MINOR 2)
include(CTest)
# Add our CMake directory to CMake's module path
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/../cmake/")
# We download some 3rdparty modules from github.com before building
# the project.
include(Download3rdParty)
# Provide compile commands file to source directory (for YCM)
SET( CMAKE_EXPORT_COMPILE_COMMANDS ON )
IF( EXISTS "${CMAKE_CURRENT_BINARY_DIR}/compile_commands.json" )
EXECUTE_PROCESS( COMMAND ${CMAKE_COMMAND} -E copy_if_different
${CMAKE_CURRENT_BINARY_DIR}/compile_commands.json
${CMAKE_CURRENT_SOURCE_DIR}/compile_commands.json
)
ENDIF()
#############################
# customization section
#############################
......@@ -32,6 +16,8 @@ ENDIF()
# Installation path
set(PIPEFABRIC_DIR "/usr/local/pfabric")
#The following variables enable or disable additional functionalities, which can be switched off to reduce build time.
# Use the boost::spirit parser for converting strings to numbers
option(USE_BOOST_SPIRIT_PARSER
"use the boost::spirit::qi parsers for converting strings to tuple attributes"
......@@ -39,6 +25,7 @@ option(USE_BOOST_SPIRIT_PARSER
)
# Use RocksDB key-value store for implementing tables
# If switched to off, it will not be downloaded, saving initial building time.
option(USE_ROCKSDB_TABLE
"use RocksDB for implementing persistent tables"
OFF
......@@ -50,6 +37,37 @@ option(USE_NVML_TABLE
ON
)
# Build only pipefabric libraries (cep and core)
# If switched to on, no other components (like RestDemo, QueryCompiler,...) will be built.
# Building test cases is independent from this.
option(BUILD_ONLY_LIBS
"build only the two pipefabric libraries"
OFF
)
# Build test cases for pipefabric functionality
# If switched to off, no tests will be build
option(BUILD_TEST_CASES
"build tests for pipefabric functionality"
ON
)
#Build google benchmark library
option(BUILD_GOOGLE_BENCH
"build google benchmark"
ON
)
# Build benchmark test
option(BUILD_BENCHMARKS
"build benchmarks for pipefabric"
ON
)
# Benchmark test requires benchmark library
if (BUILD_BENCHMARKS)
set(BUILD_GOOGLE_BENCH ON)
endif()
# Force using intel compiler
#include(CMakeForceCompiler)
......@@ -57,7 +75,7 @@ option(USE_NVML_TABLE
#CMAKE_FORCE_CXX_COMPILER(icpc "Intel C++ Compiler")
# C++ compiler flags
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14 -Wall -Wno-deprecated -g -O0 -Wsign-compare")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14 -Wall -Wno-deprecated -g -O3 -Wsign-compare")
if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-local-typedefs -Wno-#pragma-messages")
elseif("${CMAKE_CXX_COMPILER_ID}" MATCHES "GNU")
......@@ -70,6 +88,14 @@ set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Wno-unused -Wno-uninitialized")
# End of customization section
#---------------------------------------------------------------------------
# Add our CMake directory to CMake's module path
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/../cmake/")
# We download some 3rdparty modules from github.com before building
# the project.
include(Download3rdParty)
if(NOT ${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -pthread")
......@@ -94,6 +120,17 @@ else()
set(MALLOC_LIB "")
endif()
########################
# Google benchmark library
########################
#
if (BUILD_GOOGLE_BENCH)
include_directories("${THIRD_PARTY_DIR}/benchmark/include")
set (BENCHMARK_LIB "${THIRD_PARTY_DIR}/benchmark/lib/libbenchmark.a")
else()
set (BENCHMARK_LIB "")
endif()
########################
# RocksDB database library
########################
......@@ -148,7 +185,7 @@ endif()
# Boost C++ library
######################
#
SET(BOOST_MIN_VERSION "1.61.0")
SET(BOOST_MIN_VERSION "1.60.0")
find_package(Boost ${BOOST_MIN_VERSION} REQUIRED COMPONENTS
program_options
system
......@@ -182,13 +219,13 @@ set(BOOST_LIBRARIES
${Boost_LOG_LIBRARY}
${Boost_COROUTINE_LIBRARY}
${Boost_IOSTREAMS_LIBRARY}
${Boost_FILESYSTEM_LIBRARY}
${Boost_FILESYSTEM_LIBRARY}
${Boost_SYSTEM_LIBRARY}
${Boost_CHRONO_LIBRARY}
${Boost_TIMER_LIBRARY}
${Boost_THREAD_LIBRARY}
${Boost_REGEX_LIBRARY}
${DYLIB_LIBRARY}
${DYLIB_LIBRARY}
)
######################
......@@ -244,7 +281,14 @@ if(USE_ROCKSDB_TABLE)
set(core_sources
${core_sources}
${THIRD_PARTY_DIR}/rocksdb
)
)
endif()
if(BUILD_GOOGLE_BENCH)
set(core_sources
${core_sources}
${THIRD_PARTY_DIR}/benchmark
)
endif()
if(USE_NVML_TABLE)
......@@ -258,7 +302,7 @@ else()
${core_sources}
nvm/VTableInfo.cpp
)
endif(USE_NVML_TABLE)
endif()
add_library(pfabric_core SHARED
${core_sources}
......@@ -308,6 +352,11 @@ target_include_directories(Catch INTERFACE test)
enable_testing()
add_subdirectory(test)
#-----------------------------------------------------------------------------------------
#
# Micro-benchmarking using Google Benchmark
#
add_subdirectory(bench)
#-----------------------------------------------------------------------------------------
#
# Installation
......
include(../../cmake/Testing.cmake.in)
if (BUILD_BENCHMARKS)
do_test(TopologyBenchmarks)
endif()
/*
* Copyright (c) 2014-17 The PipeFabric team,
* All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License (GPL) as
* published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; see the file LICENSE.
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#include <sstream>
#include <thread>
#include <chrono>
#include <future>
#include <boost/core/ignore_unused.hpp>
#include <boost/filesystem.hpp>
#include "TestDataGenerator.hpp"
#include "core/Tuple.hpp"
#include "table/Table.hpp"
#include "dsl/Topology.hpp"
#include "dsl/Pipe.hpp"
#include "benchmark/include/benchmark/benchmark.h"
using namespace pfabric;
using namespace ns_types;
//Syntax for benchmarking: "method(benchmark::State& state)"
//The state is needed for testing.
//Additionally, the method has to be registered by
//"BENCHMARK(method)".
/**
*Testing method one: "map" before "where"
*Here a projection is done before a selection (bad), resulting in higher
*running time needed for finishing the query.
*/
void TopologyMapWhereTest(benchmark::State& state) {
typedef TuplePtr<int, std::string, double> T1;
typedef TuplePtr<double, int> T2;
TestDataGenerator tgen("file.csv");
tgen.writeData(1000);
//code inside the while-loop is tested (in this case for running time)
while (state.KeepRunning()) {
Topology t;
auto s = t.newStreamFromFile("file.csv")
.extract<T1>(',')
//map <int, string, double> to <double, int>
.map<T2>([](auto tp, bool outdated) -> T2 {
return makeTuplePtr(get<2>(tp), get<0>(tp));
})
//remove tuples whose <int> value is no multiple of 50
.where([](auto tp, bool outdated) { return get<1>(tp) % 50 == 0; });
t.start();
t.wait();
}
}
//register method for testing
BENCHMARK(TopologyMapWhereTest);
/**
*Testing method two: "where" before "map"
*Here a selection is done before a projection (good), resulting in lower
*running time needed for finishing the query.
*/
void TopologyWhereMapTest(benchmark::State& state) {
typedef TuplePtr<int, std::string, double> T1;
typedef TuplePtr<double, int> T2;
TestDataGenerator tgen("file.csv");
tgen.writeData(1000);
while (state.KeepRunning()) {
Topology t;
auto s = t.newStreamFromFile("file.csv")
.extract<T1>(',')
.where([](auto tp, bool outdated) { return get<0>(tp) % 50 == 0; })
.map<T2>([](auto tp, bool outdated) -> T2 {
return makeTuplePtr(get<2>(tp), get<0>(tp));
});
t.start();
t.wait();
}
}
BENCHMARK(TopologyWhereMapTest);
/**
*Testing method three: partitioned "where" before "map"
*In addition to method two, a partitioning with three partitions is used.
*Because of simple and fast operators, overhead is higher than gain, resulting
*in higher running time.
*/
void TopologyPartitionedWhereBeforeMapTest(benchmark::State& state) {
typedef TuplePtr<int, std::string, double> T1;
typedef TuplePtr<double, int> T2;
TestDataGenerator tgen("file.csv");
tgen.writeData(1000);
while (state.KeepRunning()) {
Topology t;
auto s = t.newStreamFromFile("file.csv")
.extract<T1>(',')
.partitionBy([](auto tp) { return get<0>(tp) % 3; }, 3)
.where([](auto tp, bool outdated) { return get<0>(tp) % 50 == 0; })
.map<T2>([](auto tp, bool outdated) -> T2 {
return makeTuplePtr(get<2>(tp), get<0>(tp));
})
.merge();
t.start();
t.wait();
//BAD: Takes far too long because of iteration number
//wait for results - stop timer
//state.PauseTiming();
//std::this_thread::sleep_for(std::chrono::milliseconds(100));
//state.ResumeTiming();
}
}
BENCHMARK(TopologyPartitionedWhereBeforeMapTest);
//Some math operation used for next two testing methods
double doMath(double input) {
double result = 0;