Commit bc398a6d authored by Constantin Pohl's avatar Constantin Pohl
Browse files

Added more test cases

parent bf9934ab
......@@ -26,22 +26,22 @@ option(USE_BOOST_SPIRIT_PARSER
# Use RocksDB key-value store for implementing tables
# If switched to off, it will not be downloaded, saving initial building time.
option(USE_ROCKSDB_TABLE "" ON)
option(USE_ROCKSDB_TABLE "" OFF)
# Build only pipefabric libraries (cep and core)
# If switched to on, no other components (like RestDemo, QueryCompiler,...) will be built.
# Building test cases is independent from this.
option(BUILD_ONLY_LIBS "" OFF)
option(BUILD_ONLY_LIBS "" ON)
# Build test cases for pipefabric functionality
# If switched to off, no tests will be build
option(BUILD_TEST_CASES "" ON)
option(BUILD_TEST_CASES "" OFF)
#Build google benchmark library
option(BUILD_GOOGLE_BENCH "" ON)
option(BUILD_GOOGLE_BENCH "" OFF)
# Build benchmark test
option(BUILD_BENCHMARKS "" ON)
option(BUILD_BENCHMARKS "" OFF)
# Use memkind library (Xeon Phi MCDRAM)
option(USE_MEMKIND "" ON)
......@@ -308,6 +308,11 @@ target_include_directories(Catch INTERFACE test)
enable_testing()
add_subdirectory(test)
#-----------------------------------------------------------------------------------------
#
# MCDRAM stuff
#
add_subdirectory(mem)
#-----------------------------------------------------------------------------------------
......
......@@ -26,12 +26,15 @@
#include <vector>
#include <unordered_map>
#include "memkind/include/hbw_allocator.h" //hbm
#ifdef USE_MEMKIND
#include "memkind/include/hbw_allocator.h" //hbm
#endif
#include "../pfabric.hpp"
using namespace pfabric;
#ifdef USE_MEMKIND
TEST_CASE("Using the memkind allocator for high-bandwidth memory", "[HBMallocator]") {
//INTEGER
......@@ -101,9 +104,9 @@ TEST_CASE("Using the memkind allocator for high-bandwidth memory", "[HBMallocato
sizeof(double), 0)<<" (0 is HBM, -1 otherwise)"<<std::endl;
}
}
#endif
TEST_CASE("Using the memkind allocator with PipeFabric", "[HBMallocator PipeFabric]") {
TEST_CASE("Using PipeFabric join (HBM/STD)", "[PipeFabric join]") {
typedef TuplePtr<int, std::string, double> tpPtr;
StreamGenerator<tpPtr>::Generator streamGen ([](unsigned long n) -> tpPtr {
......@@ -121,9 +124,7 @@ TEST_CASE("Using the memkind allocator with PipeFabric", "[HBMallocator PipeFabr
auto s2 = t->streamFromGenerator<tpPtr>(streamGen, num)
.keyBy<int>([](auto tp) { return get<0>(tp); })
//.partitionBy([](auto tp) { return get<0>(tp)%5; }, 5)
.join<int>(s1, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
//.merge()
.notify([&](auto tp, bool outdated){
res++;
})
......
......@@ -24,58 +24,106 @@
#include "catch.hpp"
#include <vector>
#include <chrono>
#include <chrono> //time
#include <memory> //std::allocator
//#include <cmath> //pow
//#include <omp.h> //threads
#include "memkind/include/hbw_allocator.h" //hbm
#ifdef USE_MEMKIND
#include "memkind/include/hbw_allocator.h" //hbm
#endif
#include "../pfabric.hpp"
TEST_CASE("Vector Timings with and without HBM", "[HBMTiming]") {
using namespace pfabric;
const int nrOfIter = 1000000;
std::chrono::high_resolution_clock::time_point start, end;
const int nrOfIter = 1000000;
hbw::allocator<std::vector<int> > allocVec;
std::vector<int>* vecPtr = allocVec.allocate(1);
#ifdef USE_MEMKIND
TEST_CASE("Timings for vector element allocation with HBM", "[HBMEle]") {
std::chrono::high_resolution_clock::time_point start, end;
std::vector<int, hbw::allocator<int> > vec(nrOfIter);
start = std::chrono::high_resolution_clock::now();
for(auto i=0; i<nrOfIter; i++) {
vecPtr->push_back(i);
vec[i] = i;
}
end = std::chrono::high_resolution_clock::now();
REQUIRE(vecPtr->size() == nrOfIter);
REQUIRE(vec.size() == nrOfIter);
std::cout<<"Memory Region vector element 0: "<<hbw_verify_memory_region(&vec[0],
sizeof(int), 0)<<" (0 is HBM, -1 otherwise)"<<std::endl;
std::cout<<"Memory Region vector element "<<(nrOfIter-1)<<": "<<hbw_verify_memory_region(&vec[(nrOfIter-1)],
sizeof(int), 0)<<" (0 is HBM, -1 otherwise)"<<std::endl;
std::cout<<"Memory Region vector hbm allocator: "<<hbw_verify_memory_region(vecPtr, (sizeof(vecPtr)+vecPtr->size()*sizeof(int)), 0)<<" (0 is HBM, -1 otherwise)"<<std::endl;
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end-start).count();
std::cout<<"Time needed: "<<duration<<std::endl;
auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(end-start).count();
std::cout<<"Time needed (hbm element alloc): "<<duration<<"ns"<<std::endl;
}
std::vector<int, hbw::allocator<int> > vec;
std::vector<int, hbw::allocator<int> >* vecIntPtr = &vec;
TEST_CASE("Timings for vector element allocation without HBM", "[stdEle]") {
std::chrono::high_resolution_clock::time_point start, end;
std::vector<int, std::allocator<int> > vec(nrOfIter);
start = std::chrono::high_resolution_clock::now();
for(auto i=0; i<nrOfIter; i++) {
vecIntPtr->push_back(i);
vec[i] = i;
}
end = std::chrono::high_resolution_clock::now();
REQUIRE(vecIntPtr->size() == nrOfIter);
REQUIRE(vec.size() == nrOfIter);
std::cout<<"Memory Region vector element hbm allocator: "<<hbw_verify_memory_region(vecIntPtr+sizeof(vecIntPtr), vecIntPtr->size()*sizeof(int), 0)<<" (0 is HBM, -1 otherwise)"<<std::endl;
duration = std::chrono::duration_cast<std::chrono::microseconds>(end-start).count();
std::cout<<"Time needed: "<<duration<<std::endl;
auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(end-start).count();
std::cout<<"Time needed (std element alloc): "<<duration<<"ns"<<std::endl;
}
#endif
std::vector<int> normalVec;
std::vector<int>* normalVecPtr = &normalVec;
/*TEST_CASE("Timings for partitioned joins with PipeFabric (PT)", "[Join PT]") {
int numThreads = omp_get_max_threads();
std::cout<<"Hardware supports up to "<<numThreads<<" threads."<<std::endl;
start = std::chrono::high_resolution_clock::now();
for(auto i=0; i<nrOfIter; i++) {
normalVecPtr->push_back(i);
}
end = std::chrono::high_resolution_clock::now();
int numPartitions = 2; int run = 1;
REQUIRE(normalVecPtr->size() == nrOfIter);
while (numPartitions<numThreads) {
numPartitions = pow(2, run);
run++;
std::cout<<"Memory Region Vector std allocator: "<<hbw_verify_memory_region(normalVecPtr, (sizeof(normalVecPtr)+normalVecPtr->size()*sizeof(int)), 0)<<" (0 is HBM, -1 otherwise)"<<std::endl;
duration = std::chrono::duration_cast<std::chrono::microseconds>(end-start).count();
std::cout<<"Time needed: "<<duration<<std::endl;
}
std::chrono::high_resolution_clock::time_point start, end;
typedef TuplePtr<int, std::string, double> tpPtr;
StreamGenerator<tpPtr>::Generator streamGen ([](unsigned long n) -> tpPtr {
return makeTuplePtr((int)n, (std::string)"This is a test string", (double)n + 0.5); });
PFabricContext ctx;
auto t = ctx.createTopology();
unsigned long res = 0;
auto s1 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s2 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.keyBy<int>([](auto tp) { return get<0>(tp); })
.partitionBy([&](auto tp) { return get<0>(tp)%numPartitions; }, numPartitions)
.join<int>(s1, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.merge()
.notify([&](auto tp, bool outdated){
res++;
})
;
start = std::chrono::high_resolution_clock::now();
t->start(false);
end = std::chrono::high_resolution_clock::now();
while(res<nrOfIter) {
std::this_thread::sleep_for(1s);
}
REQUIRE(res==nrOfIter);
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count();
std::cout<<"Time needed (PT("<<numPartitions<<") join): "<<duration<<"ms"<<std::endl;
}
}*/
include(../../cmake/Testing.cmake.in)
#if (BUILD_TEST_CASES)
if(USE_MEMKIND)
#if(USE_MEMKIND)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fopenmp")
do_test(AllocatorTest)
do_test(AllocatorTimingTest)
endif()
do_test(MemJoinTest)
do_test(Mem4JoinTest)
do_test(Mem8JoinTest)
#endif()
#endif()
/*
* Copyright (c) 2014-17 The PipeFabric team,
* All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License (GPL) as
* published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; see the file LICENSE.
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
#include "catch.hpp"
#include <chrono> //time
#ifdef USE_MEMKIND
#include "memkind/include/hbw_allocator.h" //hbm
#endif
#include "../pfabric.hpp"
using namespace pfabric;
typedef TuplePtr<int, std::string, double> tpPtr;
StreamGenerator<tpPtr>::Generator streamGen ([](unsigned long n) -> tpPtr {
return makeTuplePtr((int)n, (std::string)"This is a test string", (double)n + 0.5); });
const int nrOfIter = 1000000;
TEST_CASE("Running 16 joins parallel (queue)", "[16JoinsParallel]") {
PFabricContext ctx;
auto t = ctx.createTopology();
unsigned long res = 0;
auto s1 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s2 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s1, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s3 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s2, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s4 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s3, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s5 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s4, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s6 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s5, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s7 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s6, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s8 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s7, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s9 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s8, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s10 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s9, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s11 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s10, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s12 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s11, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s13 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s12, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s14 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s13, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s15 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s14, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s16 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s15, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.notify([&](auto tp, bool outdated){
res++;
})
;
auto start = std::chrono::high_resolution_clock::now();
t->start(false);
auto end = std::chrono::high_resolution_clock::now();
while(res<nrOfIter) {
std::this_thread::sleep_for(1s);
}
REQUIRE(res==nrOfIter);
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count();
std::cout<<"Time needed (Join 16x Q): "<<duration<<"ms"<<std::endl;
}
\ No newline at end of file
/*
* Copyright (c) 2014-17 The PipeFabric team,
* All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License (GPL) as
* published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; see the file LICENSE.
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
#include "catch.hpp"
#include <chrono> //time
#ifdef USE_MEMKIND
#include "memkind/include/hbw_allocator.h" //hbm
#endif
#include "../pfabric.hpp"
using namespace pfabric;
typedef TuplePtr<int, std::string, double> tpPtr;
StreamGenerator<tpPtr>::Generator streamGen ([](unsigned long n) -> tpPtr {
return makeTuplePtr((int)n, (std::string)"This is a test string", (double)n + 0.5); });
const int nrOfIter = 1000000;
TEST_CASE("Running 4 joins parallel (queue)", "[4JoinsParallel]") {
PFabricContext ctx;
auto t = ctx.createTopology();
unsigned long res = 0;
auto s1 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s2 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s1, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s3 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s2, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s4 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s3, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.notify([&](auto tp, bool outdated){
res++;
})
;
auto start = std::chrono::high_resolution_clock::now();
t->start(false);
auto end = std::chrono::high_resolution_clock::now();
while(res<nrOfIter) {
std::this_thread::sleep_for(1s);
}
REQUIRE(res==nrOfIter);
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count();
std::cout<<"Time needed (Join 4x Q): "<<duration<<"ms"<<std::endl;
}
TEST_CASE("Running 4 joins singlethreaded", "[4JoinsST]") {
PFabricContext ctx;
auto t = ctx.createTopology();
unsigned long res = 0;
auto s1 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s2 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s1, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s3 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s2, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s4 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s3, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })
.notify([&](auto tp, bool outdated){
res++;
})
;
auto start = std::chrono::high_resolution_clock::now();
t->start(false);
auto end = std::chrono::high_resolution_clock::now();
while(res<nrOfIter) {
std::this_thread::sleep_for(1s);
}
REQUIRE(res==nrOfIter);
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count();
std::cout<<"Time needed (Join 4x ST): "<<duration<<"ms"<<std::endl;
}
/*
* Copyright (c) 2014-17 The PipeFabric team,
* All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License (GPL) as
* published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; see the file LICENSE.
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
#include "catch.hpp"
#include <chrono> //time
#ifdef USE_MEMKIND
#include "memkind/include/hbw_allocator.h" //hbm
#endif
#include "../pfabric.hpp"
using namespace pfabric;
typedef TuplePtr<int, std::string, double> tpPtr;
StreamGenerator<tpPtr>::Generator streamGen ([](unsigned long n) -> tpPtr {
return makeTuplePtr((int)n, (std::string)"This is a test string", (double)n + 0.5); });
const int nrOfIter = 1000000;
TEST_CASE("Running 8 joins parallel (queue)", "[8JoinsParallel]") {
PFabricContext ctx;
auto t = ctx.createTopology();
unsigned long res = 0;
auto s1 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
;
auto s2 = t->streamFromGenerator<tpPtr>(streamGen, nrOfIter)
.queue()
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s1, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2); })