Commit 44d2d981 authored by Kai-Uwe Sattler's avatar Kai-Uwe Sattler
Browse files

WindowFunc modified

parent 96c97fb7
......@@ -32,6 +32,12 @@ PFabricContext::TopologyPtr PFabricContext::createTopology() {
return std::make_shared<Topology>();
}
bool PFabricContext::tableExists(const std::string& tblName) const {
// look for the table
auto it = mTableSet.find(tblName);
return (it != mTableSet.end());
}
TableInfoPtr PFabricContext::getTableInfo(const std::string& tblName) {
// look for the table
auto it = mTableSet.find(tblName);
......
......@@ -141,6 +141,8 @@ public:
return std::shared_ptr<Table<RecordType, KeyType>>();
}
bool tableExists(const std::string& tblName) const;
TableInfoPtr getTableInfo(const std::string& tblName);
/**
......
......@@ -155,14 +155,11 @@ namespace pfabric {
this->getOutputDataChannel().publish(data, outdated);
} else {
// if function available
if(this->mWindowOpFunc!=nullptr) {
// apply function on tuple
auto res = this->mWindowOpFunc(data);
// insert the tuple into buffer
if(this->mWindowOpFunc != nullptr) {
// insert the tuple into buffer
{ //necessary for lock scope!
std::lock_guard<std::mutex> guard(this->mMtx);
this->mTupleBuf.push_back(res);
this->mTupleBuf.push_back(data);
this->mCurrSize++;
}
......@@ -170,6 +167,9 @@ namespace pfabric {
if (!this->mEvictThread) {
this->mEvictFun();
}
// apply the window function - we do this after the window was updated
auto res = this->mWindowOpFunc(this->mTupleBuf.begin(),
this->mTupleBuf.end(), data);
// finally, forward the incoming tuple
this->getOutputDataChannel().publish(res, outdated);
......
......@@ -131,18 +131,19 @@ namespace pfabric {
}
else {
// if function available
if(this->mWindowOpFunc!=nullptr) {
// apply function on tuple
auto res = this->mWindowOpFunc(data);
if(this->mWindowOpFunc != nullptr) {
// insert the tuple into buffer
{ //necessary for lock scope!
std::lock_guard<std::mutex> guard(this->mMtx);
this->mTupleBuf.push_back(res);
this->mTupleBuf.push_back(data);
this->mCurrSize++;
}
//forward the incoming tuple
// apply function on tuple
auto res = this->mWindowOpFunc(this->mTupleBuf.begin(),
this->mTupleBuf.end(), data);
//forward the incoming tuple
this->getOutputDataChannel().publish(res, outdated);
// check for outdated tuples
......
......@@ -71,9 +71,17 @@ namespace pfabric {
public UnaryTransform< StreamElement, StreamElement > // use default unary transform
{
public:
typedef typename std::list<StreamElement>::const_iterator ElementIterator;
typedef std::function<Timestamp(const StreamElement&)> TimestampExtractorFunc;
typedef std::function<StreamElement(const StreamElement&)> WindowOpFunc; //lambda function applied to incoming tuples in window
/**
* An optional function that can be applied to the entire window
* when a new tuple arrives.
*/
typedef std::function<StreamElement(ElementIterator beg,
ElementIterator end,
const StreamElement&)> WindowOpFunc;
protected:
PFABRIC_UNARY_TRANSFORM_TYPEDEFS(StreamElement, StreamElement);
......
......@@ -340,7 +340,7 @@ TEST_CASE("Tuplifying a stream of RDF strings", "[Tuplifier]") {
Topology t;
auto s = t.newStreamFromFile(std::string(TEST_DATA_DIRECTORY) + "tuplifier_test1.in")
.extract<Triple>(',')
.tuplify<RDFTuple>({ "http://data.org/name", "http://data.org/price", "http://data.org/someOther" },
.tuplify<RDFTuple>({ "http://data.org/name", "http://data.org/price", "http://data.org/someOther" },
TuplifierParams::ORDERED)
.notify([&](auto tp, bool outdated) {
std::lock_guard<std::mutex> lock(r_mutex);
......@@ -373,7 +373,7 @@ TEST_CASE("Using a window with and without additional function", "[Window]") {
std::stringstream strm2;
expected = "1.5\n103\n304.5\n604.5\n904.5\n1204.5\n1504.5\n1804.5\n2104.5\n2404.5\n";
auto winFunc = [](auto tp) { get<2>(tp)++; return tp; }; //just increment incoming tuples double-attribute by one
auto winFunc = [](auto beg, auto end, auto tp) { get<2>(tp)++; return tp; }; //just increment incoming tuples double-attribute by one
Topology t2;
auto s2 = t2.newStreamFromFile("file.csv")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment