Commit 0f0733ee authored by Kai-Uwe Sattler's avatar Kai-Uwe Sattler
Browse files

Several modifications

parent a5b588ee
......@@ -39,7 +39,7 @@ Topology t;
auto s = t.newStreamFromFile("file.csv")
.extract<T1>(',')
.where([](auto tp, bool outdated) { return get<0>(tp) % 2 == 0; } )
.map<T2>([](auto tp) -> T2 {
.map<T2>([](auto tp, bool) -> T2 {
return makeTuplePtr(get<2>(tp), get<0>(tp));
})
.print(std::cout);
......
......@@ -10,4 +10,23 @@ if (NOT BUILD_ONLY_LIBS)
${Boost_FILESYSTEM_LIBRARY}
${Boost_THREAD_LIBRARY}
)
add_executable(TrackerServer TrackerServer.cpp
TrajectoryDB.cpp
GeoUtils.cpp
QueryLoop.cpp
WebServer.cpp)
target_link_libraries(TrackerServer
pfabric_qcomp
pfabric_core
${ROCKSDB_LIB}
${Boost_PROGRAM_OPTIONS_LIBRARY}
${Boost_DATE_TIME_LIBRARY}
${Boost_FILESYSTEM_LIBRARY}
${Boost_SYSTEM_LIBRARY}
${Boost_LOG_LIBRARY}
${Boost_FILESYSTEM_LIBRARY}
${Boost_THREAD_LIBRARY}
)
endif()
//
// WebServer.hpp
// pipefabric
//
// Created by Kai-Uwe Sattler on 05.09.17.
//
//
#ifndef WebServer_hpp_
#define WebServer_hpp_
#include "SimpleWeb/server_http.hpp"
#include "SimpleWeb/utility.hpp"
typedef SimpleWeb::Server<SimpleWeb::HTTP> HttpServer;
extern std::shared_ptr<std::thread> runWebServer(HttpServer& server, const std::string& webRoot);
#endif /* WebServer_hpp_ */
......@@ -26,18 +26,13 @@ using namespace pfabric;
PFabricContext::PFabricContext() {
}
PFabricContext::~PFabricContext() {}
PFabricContext::~PFabricContext() {
}
PFabricContext::TopologyPtr PFabricContext::createTopology() {
return std::make_shared<Topology>();
}
bool PFabricContext::tableExists(const std::string& tblName) const {
// look for the table
auto it = mTableSet.find(tblName);
return (it != mTableSet.end());
}
TableInfoPtr PFabricContext::getTableInfo(const std::string& tblName) {
// look for the table
auto it = mTableSet.find(tblName);
......
......@@ -134,15 +134,14 @@ public:
// if found then we return it
return std::static_pointer_cast<Table<RecordType, KeyType>>(it->second);
}
else
else {
// otherwise we just return an empty pointer
// TODO: shouldn't we throw an exception here???
std::cout << "table '" << tblName << "' not found" << std::endl;
return std::shared_ptr<Table<RecordType, KeyType>>();
}
}
bool tableExists(const std::string& tblName) const;
TableInfoPtr getTableInfo(const std::string& tblName);
/**
......
......@@ -651,7 +651,7 @@ class Pipe {
/**
* TODO
*/
Pipe<BatchPtr<T>> batch(std::size_t bsize) throw(TopologyException) {
Pipe<BatchPtr<T>> batch(std::size_t bsize = SIZE_MAX) throw(TopologyException) {
auto op = std::make_shared<Batcher<T>>(bsize);
auto iter = addPublisher<Batcher<T>, DataSource<T>>(op);
return Pipe<BatchPtr<T>>(dataflow, iter, keyExtractor, timestampExtractor,
......@@ -1330,20 +1330,31 @@ class Pipe {
Pipe<T> updateTable(
std::shared_ptr<Table<typename RecordType::element_type, KeyType>> tbl,
std::function<bool(const T&, bool,
const typename RecordType::element_type&)>
updateFunc) throw(TopologyException) {
typename RecordType::element_type&)>
updateFunc,
std::function<typename RecordType::element_type(const T&)> insertFunc
) throw(TopologyException) {
typedef std::function<KeyType(const T&)> KeyExtractorFunc;
assert(partitioningState == NoPartitioning);
try {
KeyExtractorFunc keyFunc =
boost::any_cast<KeyExtractorFunc>(keyExtractor);
return map<T, T>([=](auto tp, bool outdated) {
return map<T>([=](auto tp, bool outdated) {
KeyType key = keyFunc(tp);
tbl->updateOrDeleteByKey(
key, [=](const typename RecordType::element_type& old) -> bool {
if (!outdated)
tbl->updateOrDeleteByKey(
key, [=](typename RecordType::element_type& old) -> bool {
return updateFunc(tp, outdated, old);
});
},
[=]() -> typename RecordType::element_type {
return insertFunc(tp);
});
else
tbl->updateOrDeleteByKey(
key, [=](typename RecordType::element_type& old) -> bool {
return updateFunc(tp, outdated, old);
});
return tp;
});
} catch (boost::bad_any_cast& e) {
......
......@@ -79,6 +79,15 @@ void Topology::wait() {
f.get();
}
void Topology::runEvery(const std::chrono::seconds& secs) {
wakeupTimers.push_back(std::thread([&](){
while(true) {
std::this_thread::sleep_for(secs);
startAsync();
}
}));
}
Pipe<TStringPtr> Topology::newStreamFromFile(const std::string& fname, unsigned long limit) {
// create a new TextFileSource
auto op = std::make_shared<TextFileSource>(fname, limit);
......
......@@ -26,6 +26,7 @@
#include <vector>
#include <future>
#include <mutex>
#include <chrono>
#include "core/Tuple.hpp"
......@@ -62,11 +63,11 @@ namespace pfabric {
* auto s = t->newStreamFromFile("file.csv")
* .extract<T1>(',')
* .where<T1>([](auto tp, bool outdated) {
* return getAttribute<0>(*tp) % 2 == 0;
* return get<0>(tp) % 2 == 0;
* })
* .map<T1,T2>([](auto tp) -> T2 {
* return makeTuplePtr(getAttribute<2>(*tp),
* getAttribute<0>(*tp));
* .map<T1,T2>([](auto tp, bool) -> T2 {
* return makeTuplePtr(get<2>(tp),
* get<0>(tp));
* })
* .print<T2>(strm);
* // now, let's start the processing
......@@ -83,6 +84,7 @@ namespace pfabric {
std::vector<StartupFunc> prepareList; //< the list of functions to be called for startup
bool asyncStarted; //< true if we started asynchronously
std::vector<std::future<unsigned long> > startupFutures; //< futures for the startup functions
std::vector<std::thread> wakeupTimers; //< threads for runEvery queries
std::mutex mMutex; //< mutex for accessing startupFutures
DataflowPtr dataflow;
......@@ -135,6 +137,18 @@ namespace pfabric {
void prepare();
/**
* @brief Runs the topology periodically every @c secs seconds.
*
* Starts the processing of the topology every @c secs seconds. Note,
* that the topology should be a finite query not a continuous stream
* query.
*
* @param[in] secs
* the period of time between two invocations
*/
void runEvery(const std::chrono::seconds& secs);
/**
* @brief Waits until the execution of the topology stopped.
*
......
......@@ -364,6 +364,7 @@ std::string QueryCompiler::compileCppCode(
<< "-o " << lib_path.string() << "/lib" << fileName
<< ".dylib -install_name @rpath/lib" << fileName << ".dylib "
<< lib_path / fileName << ".cpp " << ldflags << " " << libs << std::endl;
// std::cout << cmd.str() << std::endl;
std::system(cmd.str().c_str());
return fileName;
}
......@@ -165,7 +165,7 @@ sql::SQLQueryPtr SQLParser::parse(const std::string& stmt) throw(QueryCompileExc
bool r = phrase_parse(iter, end, parser, space, *res);
if (r && iter == end) {
std::cout << "parsing ok: " << *res << std::endl;
// success
}
else {
std::cout << "parsing failed: " << std::string(iter, end) << std::endl;
......
......@@ -41,7 +41,8 @@ private:
PFABRIC_UNARY_TRANSFORM_TYPEDEFS(InputStreamElement, OutputStreamElement)
public:
Batcher(std::size_t batchSize) : mBatchSize(batchSize), mPos(0), mBuf(batchSize) {}
Batcher(std::size_t batchSize = SIZE_MAX) : mBatchSize(batchSize), mPos(0),
mBuf(batchSize != SIZE_MAX ? batchSize : 0) {}
/**
* @brief Bind the callback for the data channel.
......@@ -66,6 +67,8 @@ private:
* the incoming punctuation tuple
*/
void processPunctuation( const PunctuationPtr& punctuation ) {
if (mBatchSize == SIZE_MAX)
publishBatch();
this->getOutputPunctuationChannel().publish(punctuation);
}
......@@ -80,15 +83,28 @@ private:
* flag indicating whether the tuple is new or invalidated now
*/
void processDataElement( const InputStreamElement& data, const bool outdated ) {
// ensure capacity
if (mBatchSize == SIZE_MAX)
mBuf.resize(mPos + 1);
mBuf[mPos++] = std::make_pair(data, outdated);
if (mPos == mBatchSize) {
publishBatch();
/*
auto tup = makeTuplePtr(std::move(mBuf));
this->getOutputDataChannel().publish(tup, false);
mPos = 0;
mBuf.resize(mBatchSize);
*/
}
}
void publishBatch() {
auto tup = makeTuplePtr(std::move(mBuf));
this->getOutputDataChannel().publish(tup, false);
mPos = 0;
mBuf.resize(mBatchSize);
}
std::size_t mBatchSize, mPos;
std::vector<std::pair<InputStreamElement, bool>> mBuf;
};
......
......@@ -366,11 +366,13 @@ private:
* a reference to the lock protecting the aggregation state
*/
void produceAggregates(const Timestamp& timestamp, const bool outdated, const Lock& lock) {
/*
for (const auto groupEntry : mAggregateTable) {
const AggregateStatePtr& aggrState = groupEntry.second;
//produceAggregate(aggrState, timestamp, outdated, lock);
produceAggregate(aggrState, timestamp, outdated, lock);
}
*/
}
/**
......
......@@ -71,6 +71,8 @@ namespace pfabric {
unsigned long start() {
unsigned long ntuples = 0;
assert(mTable.get() != nullptr);
auto iter = mPredicate == nullptr ? mTable->select() : mTable->select(mPredicate);
for (; iter.isValid(); iter++) {
auto tup = *iter;
......
......@@ -108,8 +108,9 @@ namespace pfabric {
auto key = mKeyFunc(data);
if (outdated)
mTable->deleteByKey(key);
else
else {
mTable->insert(key, *data);
}
if (mAutoCommit) {
// TODO: perform commit
}
......
......@@ -64,7 +64,7 @@ public:
HashMapIterator operator++(int) { auto tmp = *this; ++(*this); return tmp; }
bool isValid() const { return i != end; }
SmartPtr<RecordType> operator*() {
SmartPtr<RecordType> operator*() {
return SmartPtr<RecordType> (new RecordType(i->second));
}
// typename Iter::value_type::second_type* operator->() { return &i->second; }
......@@ -130,10 +130,12 @@ public:
~HashMapTable() {}
/**
* @brief Insert a tuple.
* @brief Insert or update a tuple.
*
* Insert the given tuple @rec with the given key into the table. After the insert
* all observers are notified.
* Insert or update the given tuple @rec with the given key into the table.
* If the key already exists then the tuple in the table is updated, otherwise
* the tuple is newly inserted.
* After the insert/update all observers are notified.
*
* @param key the key value of the tuple
* @param rec the actual tuple
......@@ -142,6 +144,11 @@ public:
{
// make sure we have exclusive access
std::lock_guard<std::mutex> lock(mMtx);
auto iter = mDataTable.find(key);
if (iter != mDataTable.end())
// we erase the key/tuple pair first, because of the missing copy assignment
// operator in Tuple we cannot simply use the operator[]
mDataTable.erase(key);
mDataTable.insert({key, rec});
}
// after the lock is released we can inform our observers
......@@ -427,4 +434,3 @@ private:
}
#endif
......@@ -154,6 +154,8 @@ class RDBTable : public BaseTable {
//< setting the bool component of @c UpdateResult to false)
typedef std::function<bool(RecordType&)> UpdelFunc;
typedef std::function<RecordType()> InsertFunc;
//< typedef for a callback function which is invoked when the table was
// updated
typedef boost::signals2::signal<void(const RecordType&,
......@@ -195,11 +197,12 @@ class RDBTable : public BaseTable {
// Table(const TableInfo& tInfo) : BaseTable(tInfo) {}
/**
* @brief Insert a tuple.
* @brief Insert or update a tuple.
*
* Insert the given tuple @rec with the given key into the table. After the
* insert
* all observers are notified.
* Insert or update the given tuple @rec with the given key into the table.
* If the key already exists then the tuple in the table is updated, otherwise
* the tuple is newly inserted.
* After the insert/update all observers are notified.
*
* @param key the key value of the tuple
* @param rec the actual tuple
......@@ -213,6 +216,7 @@ class RDBTable : public BaseTable {
rocksdb::Slice(reinterpret_cast<const char*>(buf.data()),
buf.size()));
if (status.ok()) numRecords++;
// TODO: what happens in case of replacing a key???
}
// after the lock is released we can inform our observers
notifyObservers(rec, TableParams::Insert, TableParams::Immediate);
......@@ -289,7 +293,7 @@ class RDBTable : public BaseTable {
* or deleted (=false)
* @return the number of modified tuples
*/
unsigned long updateOrDeleteByKey(KeyType key, UpdelFunc ufunc) {
unsigned long updateOrDeleteByKey(KeyType key, UpdelFunc ufunc, InsertFunc ifunc = nullptr) {
auto keySlice = pfabric::detail::valToSlice(key);
std::string resultData;
auto status = db->Get(readOptions, keySlice, &resultData);
......@@ -304,7 +308,7 @@ class RDBTable : public BaseTable {
auto res = ufunc(rec);
// check whether we have to perform an update ...
if (!res) {
if (res) {
StreamType buf;
rec.serializeToStream(buf);
......@@ -322,6 +326,13 @@ class RDBTable : public BaseTable {
notifyObservers(rec, mode, TableParams::Immediate);
return num;
}
else {
// key doesn't exist
if (ifunc != nullptr) {
insert(key, ifunc());
return 1;
}
}
return 0;
}
......
......@@ -35,6 +35,9 @@ std::string TableInfo::typeSignature() const {
case ColumnInfo::String_Type:
os << "S";
break;
case ColumnInfo::UInt_Type:
os << "u";
break;
}
}
os << "]";
......@@ -64,6 +67,9 @@ std::ostream& operator<<(std::ostream& os, pfabric::ColumnInfo::ColumnType ct) {
case ColumnInfo::String_Type:
os << "std::string";
break;
case ColumnInfo::UInt_Type:
os << "unsigned int";
break;
}
return os;
}
......@@ -9,7 +9,7 @@
namespace pfabric {
struct ColumnInfo {
enum ColumnType { Void_Type, Int_Type, Double_Type, String_Type };
enum ColumnType { Void_Type, Int_Type, Double_Type, String_Type, UInt_Type };
ColumnInfo(const std::string& n, ColumnType ct) : mColName(n), mColType(ct) {}
......
......@@ -44,7 +44,7 @@ if (BUILD_TEST_CASES)
do_test(BarrierTest)
do_test(StreamGeneratorTest)
do_test(TuplifierTest)
# do_test(BPTreeTest)
do_test(BPTreeTest)
if (USE_ROCKSDB_TABLE)
do_test(RocksDBTest)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment