Commit 191639ca authored by Philipp Götze's avatar Philipp Götze
Browse files

Some adaptions to build process for LinearRoad usecase

parent 00b48f5e
......@@ -70,7 +70,7 @@ include_directories("${THIRD_PARTY_DIR}/fmt")
#--------------------------------------------------------------------------------
# the SimpleWeb library
download_project(PROJ SimpleWeb
GIT_REPOSITORY https://github.com/eidheim/Simple-Web-Server.git
GIT_REPOSITORY https://gitlab.com/eidheim/Simple-Web-Server.git
GIT_TAG master
GIT_SHALLOW 1
UPDATE_DISCONNECTED 1
......@@ -147,8 +147,8 @@ file(COPY ${PROJECT_BINARY_DIR}/data-src/DEBS2017
)
# Linear Road Data Driver
download_project(PROJ linroad
GIT_REPOSITORY https://github.com/samsonxian/Linear-Road-Benchmark-Data-Driver.git
download_project(PROJ linroad
GIT_REPOSITORY https://github.com/yxian29/Linear-Road-Benchmark-Data-Driver.git
GIT_TAG master
GIT_SHALLOW 1
UPDATE_DISCONNECTED 1
......@@ -157,7 +157,7 @@ download_project(PROJ linroad
add_custom_command(
OUTPUT ${THIRD_PARTY_DIR}/linroad
COMMAND ${CMAKE_COMMAND} -E copy
${PROJECT_SOURCE_DIR}/usecases/LinearRoad/CMakeLists.txt
${PROJECT_SOURCE_DIR}/usecases/LinearRoad/DataProvider/CMakeLists.txt
${linroad_SOURCE_DIR}/src
COMMAND ${CMAKE_COMMAND} -E chdir ${linroad_SOURCE_DIR}/src cmake .
COMMAND ${CMAKE_COMMAND} -E chdir ${linroad_SOURCE_DIR}/src $(MAKE)
......@@ -174,6 +174,7 @@ add_custom_command(
${linroad_SOURCE_DIR}/src/LRDataProvider.h
${THIRD_PARTY_DIR}/linroad/include
)
add_custom_target(linroad ALL DEPENDS ${THIRD_PARTY_DIR}/linroad)
endif()
#--------------------------------------------------------------------------------
......
......@@ -23,9 +23,9 @@ option(USE_KAFKA "use Apache Kafka as network source"
option(USE_MQTT "use MQTT as network source" OFF)
option(USE_BOOST_SPIRIT_PARSER "use the boost::spirit::qi parsers (strings convertion)" ON )
option(USE_ROCKSDB_TABLE "use RocksDB for implementing persistent tables" OFF)
option(USE_NVM_TABLE "use NVM for implementing persistent memory tables" OFF)
option(USE_NVM_TABLES "use NVM for implementing persistent memory tables" ON )
option(BUILD_ONLY_LIBS "build only the two pipefabric libraries" ON )
option(BUILD_TEST_CASES "build tests for pipefabric functionality" OFF)
option(BUILD_TEST_CASES "build tests for pipefabric functionality" ON )
option(BUILD_GOOGLE_BENCH "build google benchmark" OFF)
option(BUILD_BENCHMARKS "build benchmark test for pipefabric" OFF)
option(BUILD_PYTHON "build python interface for pipefabric" OFF)
......@@ -95,28 +95,6 @@ set(core_sources
)
set(core_libs "")
if(BUILD_USE_CASES)
add_definitions(-DBUILD_USE_CASES)
# Use case for linear road
include_directories("${THIRD_PARTY_DIR}/linroad/include")
set(LINROAD_LIB "${THIRD_PARTY_DIR}/linroad/lib/libLRDataProvider.a")
file(COPY ${PROJECT_SOURCE_DIR}/usecases/LinearRoad/LinearRoadTest.cpp
DESTINATION ${PROJECT_SOURCE_DIR}/test
)
set(core_sources
${core_sources}
${THIRD_PARTY_DIR}/linroad
)
set(core_libs
${core_libs}
${LINROAD_LIB}
)
if(SUPPORT_MATRICES)
# Use cases require matrix support (image and graph processing)
set(SUPPORT_MATRICES ON)
endif()
endif()
############################################################################################
......@@ -190,9 +168,9 @@ endif()
############################################################################################
#
if (USE_NVM_TABLE)
if (USE_NVM_TABLES)
message(STATUS "using NVM based persistent table")
add_definitions(-DUSE_NVM_TABLE)
add_definitions(-DUSE_NVM_TABLES)
set (PMDK_LIBRARIES
"${THIRD_PARTY_DIR}/pmdk/lib/libpmemblk.a"
"${THIRD_PARTY_DIR}/pmdk/lib/libpmemlog.a"
......@@ -209,7 +187,6 @@ if (USE_NVM_TABLE)
set(core_sources
${core_sources}
${THIRD_PARTY_DIR}/pmdk
${THIRD_PARTY_DIR}/pmdk-cpp
${THIRD_PARTY_DIR}/nvmDS
)
set(core_libs
......@@ -422,6 +399,23 @@ if(USE_MQTT)
)
endif()
############################################################################################
# Linear Road Data Provider #
############################################################################################
#
if(BUILD_USE_CASES)
include_directories(${THIRD_PARTY_DIR}/linroad/include)
set(LINROAD_LIB "${THIRD_PARTY_DIR}/linroad/lib/libLRDataProvider.a")
set(core_libs
${core_libs}
${LINROAD_LIB}
)
set(core_deps
${core_deps}
linroad
)
endif()
############################################################################################
# Building PipeFabric core library #
......@@ -445,13 +439,9 @@ set(core_sources
table/StateContext.cpp
)
add_library(pfabric_core SHARED
${core_sources}
)
target_link_libraries(pfabric_core
${core_libs}
)
add_library(pfabric_core SHARED ${core_sources})
target_link_libraries(pfabric_core ${core_libs})
add_dependencies(pfabric_core ${core_deps})
############################################################################################
......@@ -530,6 +520,16 @@ if(NOT BUILD_ONLY_LIB)
endif()
############################################################################################
# Build use cases #
############################################################################################
#
if(BUILD_USE_CASES)
add_definitions(-DBUILD_USE_CASES)
add_subdirectory(usecases)
endif()
############################################################################################
# Unit tests using Catch #
############################################################################################
......@@ -542,18 +542,6 @@ enable_testing()
add_subdirectory(test)
############################################################################################
# Build use cases #
############################################################################################
#
if(BUILD_USE_CASES)
add_subdirectory(usecases)
add_executable(LinRoad usecases/LinearRoad/LinRoad.cpp)
target_link_libraries(LinRoad pfabric_core)
endif()
############################################################################################
# Installation #
############################################################################################
......
if(BUILD_USE_CASES)
# add_subdirectory(DEBS2017)
# add_subdirectory(MatrixProcessing)
# add_subdirectory(FreqTrajectories)
# add_subdirectory(SECRET)
add_subdirectory(TxSupport)
#add_subdirectory(DEBS2017)
if(SUPPORT_MATRICES)
#add_subdirectory(MatrixProcessing)
endif()
#add_subdirectory(FreqTrajectories)
#add_subdirectory(SECRET)
add_subdirectory(LinearRoad)
add_subdirectory(TxSupport)
cmake_minimum_required (VERSION 2.8.11)
message("Building Driver for Linear Road")
file(COPY ${PROJECT_SOURCE_DIR}/usecases/LinearRoad/LinearRoadTest.cpp
DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/test
)
project(DataDriver)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread")
add_library(LRDataProvider LRDataProvider.cpp MemTuples.cpp Tuple.cpp)
target_include_directories (LRDataProvider PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
add_executable(LinRoad LinRoad.cpp)
target_link_libraries(LinRoad pfabric_core)
cmake_minimum_required (VERSION 2.8.11)
message("Building Driver for Linear Road")
project(DataDriver)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread")
add_library(LRDataProvider LRDataProvider.cpp MemTuples.cpp Tuple.cpp)
target_include_directories (LRDataProvider PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
......@@ -70,7 +70,7 @@ typedef TuplePtr<ReportType, Time, VID, Spd, XWay, Lane, Dir, Seg, Pos, int, int
/* ----------------------------------------------------------------- */
//all necessary information of a position report
struct p {
struct PosReport {
Time t;
VID v;
Spd spd;
......@@ -82,7 +82,7 @@ struct p {
};
//all necessary information of an accident alert
struct accAlert {
struct AccAlert {
ReportType repType;
Time t;
Emit t_emit;
......@@ -90,7 +90,7 @@ struct accAlert {
};
//all necessary information of a toll notification
struct tollNote {
struct TollNote {
ReportType repType;
VID v;
Time t;
......@@ -100,21 +100,21 @@ struct tollNote {
};
//used as a key in the map posReports
struct posID {
struct PosID {
Time t;
VID v;
//overwrite existing operators for == and < to use this struct as a key in a map
bool const operator==(const posID &o) const {
bool const operator==(const PosID &o) const {
return t == o.t && v == o.v;
}
bool const operator<(const posID &o) const {
bool const operator<(const PosID &o) const {
return v < o.v || (v == o.v && t < o.t);
}
};
//used as data struct in the accidents set
struct accidentID {
struct AccidentID {
XWay x;
Seg s;
VID v;
......@@ -122,32 +122,32 @@ struct accidentID {
Dir d;
//overwrite existing operators for == and < to use this struct as a key in a map
bool const operator==(const accidentID &o) const {
bool const operator==(const AccidentID &o) const {
return s == o.s && x == o.x && v == o.v && m == o.m && d == o.d;
}
bool const operator<(const accidentID &o) const {
bool const operator<(const AccidentID &o) const {
return x < o.x || (x == o.x && s < o.s) || (x == o.x && s == o.s && v < o.v) || (x == o.x && s == o.s && v == o.v && m < o.m) || (x == o.x && s == o.s && v == o.v && m == o.m && d < o.d);
}
};
//used as a key in the map spdOfSegments
struct segID {
struct SegID {
Minute m;
XWay x;
Seg s;
Dir d;
//overwrite existing operators for == and < to use this struct as a key in a map
bool const operator==(const segID &o) const {
bool const operator==(const SegID &o) const {
return m == o.m && x == o.x && s == o.s && d == o.d;
}
bool const operator<(const segID &o) const {
bool const operator<(const SegID &o) const {
return m < o.m || (m == o.m && x < o.x) || (m == o.m && x == o.x && s < o.s) || (m == o.m && x == o.x && s == o.s && d < o.d);
}
};
//used as data struct to calculate average speed of an vehicle
struct avgSpd {
struct AvgSpd {
Spd sum;
int n;
};
......@@ -158,14 +158,14 @@ struct avgSpd {
/* ----------------------------------------------------------------- */
//map containing all position reports
map<posID, p> posReports;
map<PosID, PosReport> posReports;
//current segment of each vehicle
map<VID,Seg> segs;
//holds all current accidents
set<accidentID> accidents;
set<AccidentID> accidents;
//used to calculate average speed on each segment
//also used to get number of cars at one segment
map<segID,map<VID,avgSpd>> spdOfSegments;
map<SegID,map<VID,AvgSpd>> spdOfSegments;
/* ----------------------------------------------------------------- */
......@@ -173,7 +173,7 @@ map<segID,map<VID,avgSpd>> spdOfSegments;
/* ----------------------------------------------------------------- */
/* Helper function to print an element of p. */
void printPosReport(p posReport) {
void printPosReport(PosReport posReport) {
cout << "Time: " << posReport.t << ", ";
cout << "VID: " << posReport.v << ", ";
cout << "Spd: " << posReport.spd << ", ";
......@@ -184,14 +184,14 @@ void printPosReport(p posReport) {
cout << "Pos: " << posReport.pos << endl;
}
/* Helper function to print an element of posID. */
void printPosReportKey(posID key) {
/* Helper function to print an element of PosID. */
void printPosReportKey(PosID key) {
cout << "Time: " << key.t << ", ";
cout << "VID: " << key.v << endl;
}
/* Helper function to print an accident alert. */
void printAccidentAlert(accAlert alert) {
void printAccidentAlert(AccAlert alert) {
cout << "ReportType: " << alert.repType << ", ";
cout << "Time: " << alert.t << ", ";
cout << "Emit: " << alert.t_emit << ", ";
......@@ -199,7 +199,7 @@ void printAccidentAlert(accAlert alert) {
}
/* Helper function to print a toll notification. */
void printTollNotification(tollNote note) {
void printTollNotification(TollNote note) {
cout << "ReportType: " << note.repType << ", ";
cout << "VID: " << note.v << ", ";
cout << "Time: " << note.t << ", ";
......@@ -240,7 +240,7 @@ void eraseFromPosReports(Time t, VID v) {
activePosReports--;
//get the key of the pos report
posID key = {t, v};
PosID key = {t, v};
//erase from posReports
posReports.erase(key);
}
......@@ -252,15 +252,15 @@ void eraseFromSegs(VID v) {
}
/* Helper function to add a tuple to the pos reports map. */
void addToPosReports(p posReport) {
void addToPosReports(PosReport posReport) {
//only for testing purposes
activePosReports++;
//get the key of the pos report from tuple
posID key = {posReport.t, posReport.v};
PosID key = {posReport.t, posReport.v};
//add to posReports
posReports.insert(pair<posID,p>(key,posReport));
posReports.insert(pair<PosID,PosReport>(key,posReport));
}
//TODO: should be done during outdated notification
......@@ -291,28 +291,28 @@ Minute M(Time t) {
}
/* Denotes the i'th position report emitted by v prior to t. */
p Last(int i, VID v, Time t) {
PosReport Last(int i, VID v, Time t) {
//get the bounds for the iterator
Time timeLowerBound = t - 30 * i;
Time timeUpperBound = (t - 30 * (i - 1)) - 1;
posID keyLowerBound = {timeLowerBound, v};
posID keyUpperBound = {timeUpperBound, v};
PosID keyLowerBound = {timeLowerBound, v};
PosID keyUpperBound = {timeUpperBound, v};
//get a fitting posReport
for (std::map<posID,p>::iterator it = posReports.lower_bound(keyLowerBound); it != posReports.upper_bound(keyUpperBound); ++it) {
for (std::map<PosID,PosReport>::iterator it = posReports.lower_bound(keyLowerBound); it != posReports.upper_bound(keyUpperBound); ++it) {
//key
posID key = it -> first;
PosID key = it -> first;
//element
p posReport = it -> second;
PosReport posReport = it -> second;
return posReport;
}
//automatically assigns the corresponding NULL values to all fields in p
//if no fitting posReport was found
struct p nullStruct = {0};
struct PosReport nullStruct = {0};
return nullStruct;
}
......@@ -321,7 +321,7 @@ p Last(int i, VID v, Time t) {
bool Stop(VID v, Time t, XWay x, Lane l, Pos pos, Dir d) {
for (int i = 1; i <= 4; i++) {
p last = Last(i, v, t);
PosReport last = Last(i, v, t);
if ((last.x != x) || (last.l != l) || (last.pos != pos) || (last.d != d)) {
return false;
}
......@@ -330,10 +330,10 @@ bool Stop(VID v, Time t, XWay x, Lane l, Pos pos, Dir d) {
}
/* Returns the segment if there was an accident in the segment that is exactly i segments downstream of s, in expressway x and in the travel lanes for direction d during minute m. */
Seg DetectAccident(p posReport) {
Seg DetectAccident(PosReport posReport) {
//get accident key
accidentID accID = {posReport.x, posReport.s, posReport.v, M(posReport.t), posReport.d};
AccidentID accID = {posReport.x, posReport.s, posReport.v, M(posReport.t), posReport.d};
//only insert into accidents, if lane equals 'TRAVEL'
if (posReport.l != 0 && posReport.l != 4) {
......@@ -353,8 +353,8 @@ Seg DetectAccident(p posReport) {
//if more than one entry, an accident occured
int numEntriesFound = 0;
//check if accident occured
for (set<accidentID>::iterator it = accidents.begin(); it != accidents.end(); ++it) {
accidentID accID = *it;
for (set<AccidentID>::iterator it = accidents.begin(); it != accidents.end(); ++it) {
AccidentID accID = *it;
//check if entry fits XWay and segment
//therefore, we need the direction, to either look upstream or downstream
......@@ -380,9 +380,9 @@ Seg DetectAccident(p posReport) {
}
/* Add pos report to map spdOfSegments, used to calculate avg spd. */
void addToSpeedEntries(p posReport) {
void addToSpeedEntries(PosReport posReport) {
segID key = {M(posReport.t), posReport.x, posReport.s, posReport.d};
SegID key = {M(posReport.t), posReport.x, posReport.s, posReport.d};
//try to get the current entry for this segment in this minute
try {
......@@ -400,25 +400,25 @@ void addToSpeedEntries(p posReport) {
} catch (const out_of_range& oor) {
//no entry for this vehicle existed, create new one
avgSpd avgSpdEntry = {posReport.spd, 1};
(*segMap).insert(pair<VID, avgSpd>(posReport.v, avgSpdEntry));
AvgSpd AvgSpdEntry = {posReport.spd, 1};
(*segMap).insert(pair<VID, AvgSpd>(posReport.v, AvgSpdEntry));
}
} catch (const out_of_range& oor) {
//no entry for this segment in this minute existed
//create new speed entry
avgSpd avgSpdEntry = {posReport.spd, 1};
AvgSpd AvgSpdEntry = {posReport.spd, 1};
//create new segment map value and insert speed entry
map<VID,avgSpd> segMap;
segMap.insert(pair<VID, avgSpd>(posReport.v, avgSpdEntry));
map<VID,AvgSpd> segMap;
segMap.insert(pair<VID, AvgSpd>(posReport.v, AvgSpdEntry));
//insert new seg map
spdOfSegments.insert(pair<segID,map<VID,avgSpd>>(key, segMap));
spdOfSegments.insert(pair<SegID,map<VID,AvgSpd>>(key, segMap));
}
}
/* Specifies the average speed of all vehicles that emitted a position report from segment s of expressway x in direction d during minute m. */
float Avgs(Minute m, XWay x, Seg s, Dir d) {
segID key = {m, x, s, d};
SegID key = {m, x, s, d};
float sumSpeeds = 0.0;
float numSpeeds = 0.0;
......@@ -427,9 +427,9 @@ float Avgs(Minute m, XWay x, Seg s, Dir d) {
//throws an out-of-range exception if element does not exist
auto segMap = &spdOfSegments.at(key);
//iterate over map to calculate average speed of all vehicles
for (std::map<VID,avgSpd>::iterator it=(*segMap).begin(); it!=(*segMap).end(); ++it) {
for (std::map<VID,AvgSpd>::iterator it=(*segMap).begin(); it!=(*segMap).end(); ++it) {
//spd entry
avgSpd spdEntry = it -> second;
AvgSpd spdEntry = it -> second;
//get avg speed and increase counter
sumSpeeds += static_cast<float>(spdEntry.sum) / static_cast<float>(spdEntry.n);
numSpeeds += 1.0;
......@@ -468,7 +468,7 @@ Spd Lav(Minute m, XWay x, Seg s, Dir d) {
/* Returns the set of all vehicles that emit position reports from segment s on expressway x while traveling in direction d during minute m. */
int Cars(Minute m, XWay x, Seg s, Dir d) {
segID key = {m, x, s, d};
SegID key = {m, x, s, d};
//try to get the current entry for this segment in this minute
try {
//throws an out-of-range exception if element does not exist
......@@ -525,7 +525,7 @@ int main(int argc, char* argv[]) {
.where([](auto tp, bool outdated) {
//get posReport from tuple
p posReport = {get<1>(tp), get<2>(tp), get<3>(tp), get<4>(tp), get<5>(tp), get<6>(tp), get<7>(tp), get<8>(tp)};
PosReport posReport = {get<1>(tp), get<2>(tp), get<3>(tp), get<4>(tp), get<5>(tp), get<6>(tp), get<7>(tp), get<8>(tp)};
if(posReport.t > 12000) {
cout << "Problem with time: " << posReport.t << endl;
......@@ -570,7 +570,7 @@ int main(int argc, char* argv[]) {
}
//print toll notification
tollNote note = {0, posReport.v, posReport.t, globalTimeSeconds, lav, toll};
TollNote note = {0, posReport.v, posReport.t, globalTimeSeconds, lav, toll};
//normally, all toll notes would have to be printed
if (toll > 0) {
......@@ -579,7 +579,7 @@ int main(int argc, char* argv[]) {
if (accSegment != -1) {
//generate accident alert
accAlert alert = {1, posReport.t, globalTimeSeconds, accSegment};
AccAlert alert = {1, posReport.t, globalTimeSeconds, accSegment};
printAccidentAlert(alert);
}
}
......
/*
* Copyright (C) 2014-2019 DBIS Group - TU Ilmenau, All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* PipeFabric is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with PipeFabric. If not, see <http://www.gnu.org/licenses/>.
*/
#include "catch.hpp"
#include "core/Tuple.hpp"
#include "dsl/Topology.hpp"
#include "dsl/Pipe.hpp"
#include "dsl/PFabricContext.hpp"
using namespace pfabric;
/* Use case based on the paper "Linear Road: A Stream Data Management Benchmark" from
* Arvind Arasu, Mitch Cherniack, Eduardo Galvez, David Maier, Anurag Maskey, Esther Ryvkina,
* Michael Stonebraker and Richard Tibbetts, in Proceedings of the 30th International Conference
* on Very Large Data Bases (VLDB), August, 2004.
*/
/*
* This test case runs the linear road producer from PipeFabric with the sample file (280 lines).
* It is important to mention that this test case takes some time because the tuples are delivered
* according to their timestamps.
*
* Hint: If this test is executed directly (./LinearRoadTest), you have to switch into /build/test
* folder before.
*/
TEST_CASE("Running the linear road producer with sample file", "[LinearRoad]") {
typedef TuplePtr<int, int, int, int, int,
int, int, int, int, int,
int, int, int, int, int> lrTuples; //format, see Linear Road benchmark
PFabricContext ctx;
auto t = ctx.createTopology();
std::vector<int> res;
auto s = t->newStreamFromLinRoad<lrTuples>("../3rdparty/linroad/data/datafile20seconds.dat")
.notify([&res](auto tp, bool outdated) {
int v = get<0>(tp);
res.push_back(v);
})
;
t->start(true);
t->wait();
REQUIRE(res.size() == 280);
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment