Commit d0844ad4 authored by Philipp Götze's avatar Philipp Götze
Browse files

Changed Timestamp to chrono duration +++ adapted Window and Aggregation...

Changed Timestamp to chrono duration +++ adapted Window and Aggregation accordingly +++ fixed two bugs in window +++ added some window-based aggregation test cases
parent c1c9cfb3
......@@ -47,7 +47,7 @@ typedef std::size_t AttributeIdx;
* Typedef for the timestamps associated with each tuple. A timestamp represents the number
* of microseconds since 01/01/1970 indicating when the tuple arrived in the system.
*/
typedef unsigned long long Timestamp;
typedef std::chrono::microseconds Timestamp;
/// a number for limiting the number of produced tuples
typedef std::size_t TupleLimit;
......
......@@ -17,7 +17,6 @@
* along with PipeFabric. If not, see <http://www.gnu.org/licenses/>.
*/
#include "TimestampHelper.hpp"
#include "Punctuation.hpp"
using namespace pfabric;
......
......@@ -38,13 +38,13 @@ Timestamp TimestampHelper::stringToTimestamp(const std::string& date) {
/* TODO Handle error */
}
// TODO: strptime ignores the milliseconds:
auto offs1 = date.find('.') + 1;
auto offs2 = date.find('+', offs1);
unsigned long milliseconds = std::stoi(date.substr(offs1, offs2 - offs1));
const auto offs1 = date.find('.') + 1;
const auto offs2 = date.find('+', offs1);
const unsigned long milliseconds = std::stoi(date.substr(offs1, offs2 - offs1));
//tm.tm_isdst = -1; // to determine whether daylight saving time is considered
// Timestamp t = time_to_epoch(&tm, -1); //faster than mktime but wrong!
Timestamp t = timegm(&tm);
return t * 1000000 + milliseconds * 1000;
const auto t = timegm(&tm);
return Timestamp(t * 1000000 + milliseconds * 1000);
}
Timestamp TimestampHelper::parseTimestamp(const std::string& val) {
......@@ -60,7 +60,7 @@ Timestamp TimestampHelper::parseTimestamp(const std::string& val) {
boost::regex iso_string_expr_secs("^[0-9]{8}T[0-9]{6}$");
boost::regex iso_string_expr_msecs("^[0-9]{8}T[0-9]{6}.[0-9]{6}$");
Timestamp result = 0;
Timestamp result;
if (timetype == Unknown) {
if (regex_match(val.c_str(), unix_expr)) {
......@@ -75,33 +75,35 @@ Timestamp TimestampHelper::parseTimestamp(const std::string& val) {
else {
BOOST_LOG_TRIVIAL(warning) << "could not identify timestamp type of \"" << val << "\". use internal.";
}
}
switch(timetype) {
case Unix:
result = atol(val.c_str()) * 1000; // NOTE: we assume milliseconds here!!!
break;
case String:
result = timestampFromTime(boost::posix_time::time_from_string(val));
break;
case ISOString:
result = timestampFromTime(boost::posix_time::from_iso_string(val));
break;
case Unknown:
try {
std::stringstream s;
s.exceptions (std::ios_base::failbit);
s.str(val);
s >> result;
}
catch (std::exception& e) {
BOOST_LOG_TRIVIAL(warning) << "unable to parse timestamp \"" << val << "\":\n" << e.what();
}
catch ( ... ) {
BOOST_LOG_TRIVIAL(warning) << "unable to parse timestamp \"" << val << "\". use internal.";
}
break;
}
return result;
}
switch(timetype) {
case Unix:
result = Timestamp(atol(val.c_str()) * 1000); // NOTE: we assume milliseconds here!!!
break;
case String:
result = timestampFromTime(boost::posix_time::time_from_string(val));
break;
case ISOString:
result = timestampFromTime(boost::posix_time::from_iso_string(val));
break;
case Unknown:
try {
std::stringstream s;
s.exceptions (std::ios_base::failbit);
s.str(val);
unsigned long long t = 0;
s >> t;
result = Timestamp(t);
}
catch (std::exception& e) {
BOOST_LOG_TRIVIAL(warning) << "unable to parse timestamp \"" << val << "\":\n" << e.what();
}
catch ( ... ) {
BOOST_LOG_TRIVIAL(warning) << "unable to parse timestamp \"" << val << "\". use internal.";
}
break;
}
return result;
}
} /* end namespace pquery */
......@@ -48,14 +48,15 @@ struct TimestampHelper {
* Returns the current system time as timestamp (microseconds since 01/01/1970).
*/
static inline Timestamp timestampFromCurrentTime() {
return (boost::posix_time::microsec_clock::local_time() - UNIX_EPOCH).total_microseconds();
const auto duration = std::chrono::system_clock::now().time_since_epoch();
return std::chrono::duration_cast<std::chrono::microseconds>(duration);
}
/**
* Converts the given POSIX time into a timestamp (microseconds since 01/01/1970).
*/
static inline Timestamp timestampFromTime(const boost::posix_time::ptime& tm) {
return (tm - UNIX_EPOCH).total_microseconds();
return Timestamp((tm - UNIX_EPOCH).total_microseconds());
}
/**
......@@ -65,12 +66,16 @@ struct TimestampHelper {
/**
* Returns the given timestamp as a POSIX time value.
* Inspired by: https://stackoverflow.com/a/4918873
*/
static inline boost::posix_time::ptime timestampToPtime(Timestamp ts) {
return boost::posix_time::ptime(
boost::gregorian::date(1970,1,1),
boost::posix_time::time_duration(0, 0, 0, ts)
);
using duration_t = std::chrono::nanoseconds;
const auto d = std::chrono::duration_cast<duration_t>(ts).count();
const auto sec = d / (1000 * 1000 * 1000);
const auto nsec = d % (1000 * 1000 * 1000);
return boost::posix_time::from_time_t(0)+
boost::posix_time::seconds(static_cast<long>(sec))+
boost::posix_time::microseconds((nsec+500)/1000);
}
/**
......@@ -79,7 +84,9 @@ struct TimestampHelper {
static Timestamp stringToTimestamp(const std::string& date);
// 1000 microseconds * 60 seconds * 60 minutes * 24 hours
static unsigned int toDays(Timestamp ts) { return ts / 86400000; }
static unsigned int toDays(Timestamp ts) {
return std::chrono::duration_cast<std::chrono::hours>(ts).count() / 24;
}
/**
* TODO
......
......@@ -21,7 +21,6 @@
#define Pipe_hpp_
#include <string>
#include <typeinfo>
#include <boost/any.hpp>
......@@ -396,8 +395,7 @@ class Pipe {
*/
Pipe<T> assignTimestamps(typename Window<T>::TimestampExtractorFunc func) {
return Pipe<T>(dataflow, tailIter, keyExtractor, func, transactionIDExtractor,
partitioningState,
numPartitions);
partitioningState, numPartitions);
}
/**
......@@ -416,11 +414,10 @@ class Pipe {
template <int N>
Pipe<T> assignTimestamps() {
std::function<Timestamp(const T&)> func = [](const T& tp) -> Timestamp {
return getAttribute<N>(tp);
return Timestamp(getAttribute<N>(tp) * 1000 * 1000); //< default interpretation as seconds
};
return Pipe<T>(dataflow, tailIter, keyExtractor, func, transactionIDExtractor,
partitioningState,
numPartitions);
partitioningState, numPartitions);
}
/**
......@@ -481,7 +478,7 @@ class Pipe {
return Pipe<T>(dataflow, iter, keyExtractor, timestampExtractor, transactionIDExtractor,
partitioningState, numPartitions);
}
} catch (boost::bad_any_cast& e) {
} catch (const boost::bad_any_cast& e) {
throw TopologyException(
"No TimestampExtractor defined for slidingWindow.");
}
......@@ -540,7 +537,7 @@ class Pipe {
return Pipe<T>(dataflow, iter, keyExtractor, timestampExtractor, transactionIDExtractor,
partitioningState, numPartitions);
}
} catch (boost::bad_any_cast& e) {
} catch (const boost::bad_any_cast& e) {
throw TopologyException(
"No TimestampExtractor defined for tumblingWindow.");
}
......@@ -1103,23 +1100,28 @@ class Pipe {
typename Aggregation<T, Tout, AggrState>::IterateFunc iterFun,
AggregationTriggerType tType = TriggerAll,
const unsigned int tInterval = 0) noexcept(false) {
using AggrType = Aggregation<T, Tout, AggrState>;
if (!timestampExtractor.empty()) tType = TriggerByTimestamp;
if (partitioningState == NoPartitioning) {
auto op = std::make_shared<Aggregation<T, Tout, AggrState>>(
finalFun, iterFun, tType, tInterval);
auto iter =
addPublisher<Aggregation<T, Tout, AggrState>, DataSource<T>>(op);
return Pipe<Tout>(dataflow, iter, keyExtractor, timestampExtractor, transactionIDExtractor,
partitioningState, numPartitions);
std::shared_ptr<AggrType> op;
if (tType == TriggerByTimestamp) {
using ExtractorFunc = typename AggrType::TimestampExtractorFunc;
auto fn = boost::any_cast<ExtractorFunc>(timestampExtractor);
op = std::make_shared<AggrType>(finalFun, iterFun, fn, tInterval);
} else {
op = std::make_shared<AggrType>(finalFun, iterFun, tType, tInterval);
}
auto iter = addPublisher<AggrType, DataSource<T>>(op);
return Pipe<Tout>(dataflow, iter, keyExtractor, timestampExtractor,
transactionIDExtractor, partitioningState, numPartitions);
} else {
std::vector<std::shared_ptr<Aggregation<T, Tout, AggrState>>> ops;
std::vector<std::shared_ptr<AggrType>> ops;
for (auto i = 0u; i < numPartitions; i++) {
ops.push_back(std::make_shared<Aggregation<T, Tout, AggrState>>(
finalFun, iterFun, tType, tInterval));
ops.push_back(std::make_shared<AggrType>(finalFun, iterFun, tType, tInterval));
}
auto iter =
addPartitionedPublisher<Aggregation<T, Tout, AggrState>, T>(ops);
return Pipe<Tout>(dataflow, iter, keyExtractor, timestampExtractor, transactionIDExtractor,
partitioningState, numPartitions);
auto iter = addPartitionedPublisher<AggrType, T>(ops);
return Pipe<Tout>(dataflow, iter, keyExtractor, timestampExtractor,
transactionIDExtractor, partitioningState, numPartitions);
}
}
......@@ -1255,7 +1257,7 @@ class Pipe {
return Pipe<Tout>(dataflow, iter, keyExtractor, timestampExtractor, transactionIDExtractor,
partitioningState, numPartitions);
}
} catch (boost::bad_any_cast& e) {
} catch (const boost::bad_any_cast& e) {
throw TopologyException("No KeyExtractor defined for groupBy.");
}
}
......@@ -1297,7 +1299,7 @@ class Pipe {
return Pipe<Tout>(dataflow, iter, keyExtractor, timestampExtractor, transactionIDExtractor,
partitioningState, numPartitions);
}
} catch (boost::bad_any_cast& e) {
} catch (const boost::bad_any_cast& e) {
throw TopologyException("No KeyExtractor defined for groupBy.");
}
}
......@@ -1464,7 +1466,7 @@ class Pipe {
return Pipe<Tout>(dataflow, iter, keyExtractor, timestampExtractor, transactionIDExtractor,
partitioningState, numPartitions);
}
} catch (boost::bad_any_cast& e) {
} catch (const boost::bad_any_cast& e) {
throw TopologyException("No KeyExtractor defined for join.");
}
}
......@@ -1559,7 +1561,7 @@ class Pipe {
return Pipe<Tout>(dataflow, iter, keyExtractor, timestampExtractor,
transactionIDExtractor, partitioningState, numPartitions);
} catch (boost::bad_any_cast& e) {
} catch (const boost::bad_any_cast& e) {
throw TopologyException("No KeyExtractor defined for join.");
}
}
......@@ -1605,7 +1607,7 @@ class Pipe {
auto iter = addPublisher<ToTxTable<T, KeyType>, DataSource<T>>(op);
return Pipe<T>(dataflow, iter, keyExtractor, timestampExtractor, transactionIDExtractor,
partitioningState, numPartitions);
} catch (boost::bad_any_cast& e) {
} catch (const boost::bad_any_cast& e) {
throw TopologyException("No KeyExtractor or TransactionIDExtractor defined for toTxTable.");
}
}
......@@ -1624,7 +1626,7 @@ class Pipe {
auto iter = addPublisher<ToTable<T, KeyType>, DataSource<T>>(op);
return Pipe<T>(dataflow, iter, keyExtractor, timestampExtractor, transactionIDExtractor,
partitioningState, numPartitions);
} catch (boost::bad_any_cast& e) {
} catch (const boost::bad_any_cast& e) {
throw TopologyException("No KeyExtractor defined for toTable.");
}
}
......@@ -1687,7 +1689,7 @@ class Pipe {
});
return tp;
});
} catch (boost::bad_any_cast& e) {
} catch (const boost::bad_any_cast& e) {
throw TopologyException("No KeyExtractor defined for updateTable.");
}
}
......
......@@ -23,6 +23,8 @@
#include <thread>
#include <mutex>
#include <boost/variant.hpp>
#include "core/Tuple.hpp"
#include "core/Punctuation.hpp"
#include "qop/AggregateFunctions.hpp"
......@@ -62,28 +64,28 @@ namespace pfabric {
public:
/**
* Typedef for pointer to the aggregation state
* Alias for pointer to the aggregation state
*/
typedef std::shared_ptr<AggregateState> AggregateStatePtr;
using AggregateStatePtr = std::shared_ptr<AggregateState>;
/**
* Typedef for a function to extract the timestamp from a tuple
* Alias for a function to extract the timestamp from a tuple
*/
typedef std::function<Timestamp(const InputStreamElement&)> TimestampExtractorFunc;
using TimestampExtractorFunc = std::function<Timestamp(const InputStreamElement&)>;
/**
* @brief The aggregation function which produces the final (or periodic) aggregation result.
*
* This function gets a pointer to the aggregate state as well as the timestamp for the result elment.
*/
typedef std::function< OutputStreamElement(AggregateStatePtr) > FinalFunc;
using FinalFunc = std::function<OutputStreamElement(AggregateStatePtr)>;
/**
* @brief The function which is invoked for each incoming stream element to calculate the incremental aggregates.
*
* This function gets the incoming stream element, the aggregate state, and the boolean flag for outdated elements.
*/
typedef std::function< void(const InputStreamElement&, AggregateStatePtr, const bool) > IterateFunc;
using IterateFunc = std::function<void(const InputStreamElement&, AggregateStatePtr, const bool)>;
/**
......@@ -142,21 +144,27 @@ namespace pfabric {
*/
Aggregation(FinalFunc final_fun, IterateFunc it_fun,
TimestampExtractorFunc func, const unsigned int tInterval) :
mAggrState(std::make_shared<AggregateState>()),
mIterateFunc( it_fun ), mFinalFunc( final_fun ),
mTimestampExtractor(func),
mTriggerType(TriggerByTimestamp), mTriggerInterval( tInterval ),
mNotifier(nullptr), mLastTriggerTime(0), mCounter(0) {
}
Aggregation(std::make_shared<AggregateState>(), final_fun, it_fun, func, Timestamp(tInterval * 1000 * 1000)) {}
Aggregation(AggregateStatePtr state, FinalFunc final_fun, IterateFunc it_fun,
TimestampExtractorFunc func, const unsigned int tInterval) :
Aggregation(state, final_fun, it_fun, func, Timestamp(tInterval * 1000 * 1000)) {}
template<class Rep, class Period = std::ratio<1>>
Aggregation(FinalFunc final_fun, IterateFunc it_fun,
TimestampExtractorFunc func, const std::chrono::duration<Rep, Period> tInterval) :
Aggregation(std::make_shared<AggregateState>(), final_fun, it_fun, func, tInterval) {}
template<class Rep, class Period = std::ratio<1>>
Aggregation(AggregateStatePtr state, FinalFunc final_fun, IterateFunc it_fun,
TimestampExtractorFunc func, const std::chrono::duration<Rep, Period> tInterval) :
mAggrState(state),
mIterateFunc( it_fun ), mFinalFunc( final_fun ),
mTimestampExtractor(func),
mTriggerType(TriggerByTimestamp), mTriggerInterval( tInterval ),
mNotifier(nullptr), mLastTriggerTime(0), mCounter(0) {
}
mTimestampExtractor(func), mNotifier(nullptr), mLastTriggerTime(0),
mTriggerType(TriggerByTimestamp),
mTriggerInterval(std::chrono::duration_cast<Timestamp>(tInterval)),
mCounter(0) {}
/**
* @brief Bind the callback for the data channel.
*/
......@@ -187,37 +195,39 @@ namespace pfabric {
// the actual aggregation is outsourced to a user-defined expression
mIterateFunc(data, mAggrState, outdated);
switch (mTriggerType) {
case TriggerAll:
{
// produce an aggregate tuple
auto tn = mFinalFunc(mAggrState);
this->getOutputDataChannel().publish( tn, outdated );
break;
}
case TriggerByCount:
{
if (++mCounter == mTriggerInterval) {
myLock.unlock(); // we have to unlock here: produceAggregate will acquire its own lock
notificationCallback();
mCounter = 0;
return;
if (!outdated) {
switch (mTriggerType) {
case TriggerAll: {
// produce an aggregate tuple
auto tn = mFinalFunc(mAggrState);
this->getOutputDataChannel().publish( tn, outdated );
break;
}
break;
}
case TriggerByTimestamp:
{
auto ts = mTimestampExtractor(data);
if (ts - mLastTriggerTime >= mTriggerInterval) {
myLock.unlock();
notificationCallback();
mLastTriggerTime = ts;
return;
case TriggerByCount: {
if (++mCounter == boost::get<unsigned int>(mTriggerInterval)) {
myLock.unlock(); // we have to unlock here: produceAggregate will acquire its own lock
notificationCallback();
mCounter = 0;
return;
}
break;
}
case TriggerByTimestamp: {
const auto ts = mTimestampExtractor(data);
if (ts - mLastTriggerTime >= boost::get<Timestamp>(mTriggerInterval)) {
myLock.unlock();
notificationCallback();
mLastTriggerTime = ts;
return;
}
break;
}
break;
default:
break;
}
default:
break;
} else {
auto aggregationResult = mFinalFunc( mAggrState );
this->getOutputDataChannel().publish( aggregationResult, true );
}
myLock.unlock();
}
......@@ -233,11 +243,13 @@ namespace pfabric {
*/
void processPunctuation( const PunctuationPtr& punctuation ) {
// if we receive a punctuation on expired slides we produce aggregates
//TODO: already handled by notificationCallback?...
/*
if( punctuation->ptype() == Punctuation::EndOfStream
|| punctuation->ptype() == Punctuation::WindowExpired
|| punctuation->ptype() == Punctuation::SlideExpired ) {
produceAggregates();
}
}*/
this->getOutputPunctuationChannel().publish(punctuation);
}
......@@ -264,20 +276,22 @@ namespace pfabric {
this->getOutputPunctuationChannel().publish(punctuation);
}
TimestampExtractorFunc mTimestampExtractor; //< a pointer to the function for extracting
//< the timestamp from the tuple
using IntervalType = boost::variant<Timestamp, unsigned int>;
AggregateStatePtr mAggrState; //< a pointer to the object representing the aggregation state
mutable std::mutex aggrMtx; //< a mutex for synchronizing access between
//< the trigger notifier thread and aggregation operator
IterateFunc mIterateFunc; //< a pointer to the iteration function called for each tuple
FinalFunc mFinalFunc; //< a pointer to a function computing the final
//< (or periodical) aggregates
TimestampExtractorFunc mTimestampExtractor; //< a pointer to the function for extracting
//< the timestamp from the tuple
std::unique_ptr<TriggerNotifier> mNotifier; //< the notifier object which triggers the
//< computation of aggregates periodically
Timestamp mLastTriggerTime; //< the timestamp of the last aggregate publishing
AggregationTriggerType mTriggerType; //< the type of trigger activating the publishing
//< of an aggregate value
unsigned int mTriggerInterval; //< the interval (time in seconds, number of tuples)
IntervalType mTriggerInterval; //< the interval (time in seconds, number of tuples)
//< for publishing aggregates
unsigned int mCounter; //< the number of tuples processed since the
//< last aggregate publishing
......
......@@ -270,7 +270,7 @@ private:
switch (mTriggerType) {
case TriggerByCount:
{
if (++mCounter == mTriggerInterval) {
if (++mCounter == boost::get<unsigned int>(mTriggerInterval)) {
notificationCallback();
mCounter = 0;
}
......@@ -279,7 +279,7 @@ private:
case TriggerByTimestamp:
{
auto ts = mTimestampExtractor(data);
if (ts - mLastTriggerTime >= mTriggerInterval) {
if (ts - mLastTriggerTime >= boost::get<Timestamp>(mTriggerInterval)) {
notificationCallback();
mLastTriggerTime = ts;
}
......@@ -326,7 +326,7 @@ private:
*/
void processNewAggregationGroup(KeyType grpKey, const InputStreamElement& data, const Lock& lock) {
const bool outdated = false;
const Timestamp elementTime = mTimestampExtractor != nullptr ? mTimestampExtractor(data) : 0;
const Timestamp elementTime = mTimestampExtractor != nullptr ? mTimestampExtractor(data) : Timestamp(0);
// create a new aggregation state
// if a factory object was provided we can call its create method
......@@ -367,7 +367,7 @@ private:
auto groupEntry = mAggregateTable.find(grpKey);
AggregateStatePtr aggrState = groupEntry->second;
const Timestamp elementTime = mTimestampExtractor != nullptr ? mTimestampExtractor(data) : 0;
const Timestamp elementTime = mTimestampExtractor != nullptr ? mTimestampExtractor(data) : Timestamp(0);
/* 1. Send the previous aggregated state as outdated
if (mTriggerType == TriggerAll) {
produceAggregate(aggrState, elementTime, true, lock);
......@@ -456,21 +456,23 @@ protected:
}
private:
HashTable mAggregateTable; //< a hash table for storing the aggregation states for each group
TimestampExtractorFunc mTimestampExtractor; //!< a pointer to the function for extracting the timestamp from the tuple
//!< for each group at runtime
mutable AggregationMutex mAggrMtx; //!< a mutex for synchronizing access between the trigger notifier thread
//!< and aggregation operator
GroupByFunc mGroupByFunc; //!< a pointer to the function determining the key value for the group
IterateFunc mIterateFunc; //!< a pointer to the iteration function called for each tuple
FinalFunc mFinalFunc; //!< a pointer to a function computing the final (or periodical) aggregates
unsigned int mTriggerInterval; //!< the interval (time in seconds, number of tuples) for publishing aggregates
std::unique_ptr<TriggerNotifier> mNotifier; //!< the notifier object which triggers the computation of aggregates periodically
Timestamp mLastTriggerTime; //!< the timestamp of the last aggregate publishing
AggregationTriggerType mTriggerType; //!< the type of trigger activating the publishing of an aggregate value
unsigned int mCounter; //!< the number of tuples processed since the last aggregate publishing
AggregateStatePtr mFactory;
FactoryFunc mFactoryFunc;
using IntervalType = boost::variant<Timestamp, unsigned int>;
HashTable mAggregateTable; //< a hash table for storing the aggregation states for each group
TimestampExtractorFunc mTimestampExtractor; //!< a pointer to the function for extracting the timestamp from the tuple
//!< for each group at runtime
mutable AggregationMutex mAggrMtx; //!< a mutex for synchronizing access between the trigger notifier thread
//!< and aggregation operator
GroupByFunc mGroupByFunc; //!< a pointer to the function determining the key value for the group
IterateFunc mIterateFunc; //!< a pointer to the iteration function called for each tuple
FinalFunc mFinalFunc; //!< a pointer to a function computing the final (or periodical) aggregates
IntervalType mTriggerInterval; //!< the interval (time in seconds, number of tuples) for publishing aggregates
std::unique_ptr<TriggerNotifier> mNotifier; //!< the notifier object which triggers the computation of aggregates periodically
Timestamp mLastTriggerTime; //!< the timestamp of the last aggregate publishing
AggregationTriggerType mTriggerType; //!< the type of trigger activating the publishing of an aggregate value
unsigned int mCounter; //!< the number of tuples processed since the last aggregate publishing
AggregateStatePtr mFactory;
FactoryFunc mFactoryFunc;
};
} /* end namespace pfabric */
......
......@@ -73,17 +73,30 @@ namespace pfabric {
typename Window<StreamElement>::WindowOpFunc windowFunc = nullptr,
const unsigned int ei = 0) :
WindowBase(func, wt, sz, windowFunc, ei ) {
if (ei == 0) {
// sliding window where the incoming tuple evicts outdated tuples
this->mEvictFun = std::bind( this->mWinType == WindowParams::RangeWindow ?
&SlidingWindow::evictByTime : &SlidingWindow::evictByCount, this
);
}
else {
// sliding window, but we need a thread for evicting tuples
WindowParams::EvictionFunc efun = boost::bind( &SlidingWindow::evictByTime, this );
this->mEvictThread = std::make_unique< EvictionNotifier >( this->mEvictInterval, efun );
}
setupEviction(ei);
}
/**
* @brief Create a new sliding window operator instance with the given parameters.
*
* Create a new sliding window operator of a given window type with a timestamp
* extractor function. This constructor should be mainly used with time-based
* windows (WindowParams::RangeWindow).
*
* @param func a function for extracting the timestamp value from the stream element
* @param wt the type of the window (range or row)
* @param sz the window size (as chrono duration or number of tuples)
* @param windowFunc optional function for modifying incoming tuples
* @param ei ei the eviction interval, i.e., time for triggering the eviction (in milliseconds)
*/
template<class Rep, class Period = std::ratio<1>>
SlidingWindow(typename Window<StreamElement>::TimestampExtractorFunc func,