Commit 53e44dcb authored by Kai-Uwe Sattler's avatar Kai-Uwe Sattler
Browse files

docs updated

parent b7b22e1d
......@@ -299,14 +299,14 @@ auto s = t.newStreamFromFile("data.csv")
#### assignTimestamps ####
`Pipe<T> Pipe::assignTimestamps(typename Window<T>::TimestampExtractorFunc func)`
`Pipe<T> Pipe::assignTimestamps()`
`Pipe<T> Pipe::assignTimestamps<N>()`
This operator sets a timestamp on incoming tuples, used for window or outdated calculations. The first method uses `func` on tuple type `T` as
a function to calculate the corresponding timestamp. The second method uses a specified column as timestamp.
This operator assigns a timestamp to incoming tuples, used for window or outdated calculations. The first method uses `func` on tuple type `T` as
a function to calculate the corresponding timestamp. The second method uses a specified column `N` (a number between 0 and NumColumns - 2) as timestamp.
The following two examples show the usage of this operator (with same results on different syntax), both setting the second attribute as timestamp.
```
```C++
typedef TuplePtr<Tuple<int,std::string,int>> Tin;
Topology t;
......@@ -315,7 +315,7 @@ auto s = t.newStreamFromFile("data.csv")
.assignTimestamps([](auto tp) { return get<1>(tp); })
```
```
```C++
typedef TuplePtr<Tuple<int,std::string,int>> Tin;
Topology t;
......@@ -344,13 +344,46 @@ separate thread to retrieve tuples from the queue and sent them downstream. In t
#### aggregate ####
`Pipe<Tout> Pipe::aggregate(tType, tInterval)`
`Pipe<Tout> Pipe::aggregate(finalFun, iterFun, tType, tInterval)`
`Pipe<Tout> Pipe::aggregate<Tout, State>(tType, tInterval)`
`Pipe<Tout> Pipe::aggregate<Tout, State>(finalFun, iterFun, tType, tInterval)`
calculates a set of aggregates over the stream of type `Tin`, possibly supported by a window. The aggregates a represented by tuples
of type `Tout`. `aggregate` needs a helper class of type parameter `State`. The parameters are the mode `m` for triggering the
aggregate calculation, and an optional interval time for producing aggregate values (`interval`).
As long as only standard aggregate functions (e.g. sum, count, avg etc.) are used the state class can be implemented
using the predefined `AggregatorN` template where `N` denotes the number of aggregate functions. The following
example creates a state class for computing the average, the count, and the sum on a stream of tuples conisting
of `int` values:
```C++
typedef TuplePtr<Tuple<int>> InTuple;
typedef Aggregator3<
InTuple, i // the input type for the aggregation
AggrAvg<int, int>,// an aggregate for calculating the average (input column type = int, output type = int)
0, // the values to be aggregated are taken from column 0
AggrCount<int>, // an aggregate for counting values (result type = int)
0, // again, we count values in column 0
AggrSum<int> // an aggregate for calculating the sum of int values
0 // column 0 again
> AggrState;
```
With the help of this state class, `aggregate` can be used. However, we have to define the ouput type of the
aggregate which consists in this case of three `int` values (for avg, count, sum):
```C++
tyepdef TuplePtr<Tuple<int, int, int>> OutTuple;
Topology t;
auto s = t.newStreamFromFile("data.csv")
.extract<InTuple>(',')
.aggregate<OutTuple, AggrState>()
...
```
In case you need more advanced aggregations going beyond the standard aggregates you still can implement your
own aggregation state class. This class has to provide an `iterate` function that is called for each input
tuple and a `finalize` function for producing the final result tuple.
#### groupBy #####
......@@ -362,7 +395,24 @@ aggregate calculation, and an optional interval time for producing aggregate val
#### notify ####
`Pipe<T> Pipe::notify(func, pfunc)`
`Pipe<T> Pipe::notify(std::function<void(const T&, bool)> func, std::function<void(const PunctuationPtr&)> pfunc)`
Notify is an operator for triggering callbacks. It forwards all tuples to its subscribers and invokes
the callback function `func` for each tuple as well as the (optional) punctuation callback `pfunc` for each punctuation
tuple. Note that the tuple cannot be modified for the stream (use `map` instead) and beware potential race
conditions if you modify a global state in the callback functions.
```C++
typedef TuplePtr<Tuple<int,std::string,int>> Tin;
Topology t;
auto s = t.newStreamFromFile("data.csv")
.extract<Tin>(',')
.notify(auto tp, bool outdated) {
auto v = get<0>(tp);
std::cout << "---> " << v << std::endl;
});
```
#### partitionBy ####
......@@ -372,6 +422,9 @@ aggregate calculation, and an optional interval time for producing aggregate val
`Pipe<T> Pipe::merge()`
The `merge` operator is used only in combination with `partitionBy` to combine the partitioned stream into
a single one. See above for an example.
#### barrier ####
`Pipe<T> Pipe::barrier(cVar, mtx, f)`
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment