Commit 85f4e456 authored by Kai-Uwe Sattler's avatar Kai-Uwe Sattler
Browse files

docs updated

parent 257b872f
......@@ -15,9 +15,9 @@ only consisting of a single string attribute (`TStringPtr`). There is an optiona
which is zero on default (that means, read until end of file).
The following example reads a file named `data.csv`. It has to be in the same folder as the runnable code, else use
"\path\to\your\data.csv" to find it.
"/path/to/your/data.csv" to find it.
```
```C++
Topology t;
auto s = t.newStreamFromFile("data.csv")
```
......@@ -33,15 +33,15 @@ The parameters define the TCP port for receiving REST calls (`port`), the URI pa
The following example listens on port 8099 with corresponding interface and posting-method. An external call produces a single tuple,
which is forwarded to the next operator (usually an extract-operator, defined later on).
```
```C++
Topology t;
auto s = t.newStreamFromREST(8099, "^/publish$", RESTSource::POST_METHOD)
```
#### newStreamFromZMQ ####
`Pipe<TStringPtr> Topology::newAsciiStreamFromZMQ(path, stype)`
`Pipe<TBufPtr> Topology::newBinaryStreamFromZMQ(path, stype)`
`Pipe<TStringPtr> Topology::newAsciiStreamFromZMQ(const std::string& path, ZMQParams::SourceType stype)`
`Pipe<TBufPtr> Topology::newBinaryStreamFromZMQ(const std::string& path, ZMQParams::SourceType stype)`
This operator uses the ZeroMQ source to receive tuples via network protocol. The `path` is used to define the network connection endpoint.
The `stype` defines source or sink of ZeroMQ, used as source (publisher) in this context per default. Ascii or Binary are two types of encoding
......@@ -50,7 +50,7 @@ the tuples, choose correspondingly.
The following example constructs a ZeroMQ source as a TCP connection at port 5678, waiting for external calls to produce tuples for
following operators.
```
```C++
Topology t;
auto s = t.newAsciiStreamFromZMQ("tcp://localhost:5678")
```
......@@ -67,7 +67,7 @@ The following example defines first the structure of a tuple `T1` per typedef (s
For every update on `testTable` a tuple with structure `T1` and keytype `long` (key based on first attribute) is forwarded to
following operators.
```
```C++
typedef TuplePtr<Tuple<long, std::string, double>> T1;
Topology t;
......@@ -76,27 +76,27 @@ auto s = t.newStreamFromTable<T1, long>(testTable)
#### fromStream ####
`Pipe<T> Topology::fromStream(stream)`
`Pipe<T> Topology::fromStream(Dataflow::BaseOpPtr stream)`
This operator gets another already processed stream as input. With this operator it is possible to use another (already preprocessed)
stream, for example. `T` represents the tuple type, `stream` is another data stream, possibly defined per `toStream()` operator
This operator takes an existing stream as input. With this operator it is possible to use another (already created)
stream, e.g. to reuse a stream or define a view on it. `T` represents the tuple type, `stream` is another data stream, possibly populated by the `toStream()` operator
explained later on.
The following example defines the structure of a tuple `T1` first, along with a new stream element. Then, tuples are pushed into this
stream by an already existing topology. Finally, the stream is then used in `fromStream`.
The following example registers a new named stream object of type `T1` first. Then, tuples are pushed into this
stream by an already existing topology. Finally, the stream is then used in `fromStream` in another topology.
```
```C++
typedef TuplePtr<Tuple<long, std::string, double>> T1;
PFabricContext ctx;
Dataflow::BaseOpPtr stream = ctx.createStream<T1>("streamName");
Dataflow::BaseOpPtr myStream = ctx.createStream<T1>("streamName");
[...]
.toStream(streamName);
.toStream(myStream);
[...]
Topology t;
auto s = t.fromStream<T1>(streamName)
auto s = t.fromStream<T1>(myStream);
```
#### fromGenerator ####
......@@ -110,7 +110,7 @@ The following example defines a tuple consisting of two numbers. The generator f
both numbers by one for all following tuples, described in a lambda function. `streamFromGenerator` uses this function to produce 1000 tuples,
forwarding them to the next operators.
```
```C++
typedef TuplePtr<Tuple<int, int>> T1;
StreamGenerator<T1>::Generator gen ([](unsigned long n) -> T1 {
......@@ -148,7 +148,7 @@ This operator processes a stream of JSON strings and constructs tuples of type `
The following example reads JSON strings from REST source and extracts them into tuples `Tin`, consisting out of an integer (key) and double
(data).
```
```C++
typedef TuplePtr<Tuple<int, double>> Tin;
auto s = t.newStreamFromREST(8099, "^/publish$", RESTSource::POST_METHOD)
......@@ -165,7 +165,7 @@ The `T` parameter represents the input/output type (a `TuplePtr` type).
The following example reads again tuples from a file called "data.csv". After extracting them, all tuples whose first attribute is uneven
(mod 2) are dropped.
```
```C++
typedef TuplePtr<Tuple<int,int,int>> Tin;
Topology t;
......@@ -184,7 +184,7 @@ the function `func`.
The following example reads tuples again from "data.csv". Each tuple consists out of three integer attributes. After the `map` operator, each
tuple only consists out of one integer attribute (the second attribute from inserted tuple).
```
```C++
typedef TuplePtr<Tuple<int,int,int>> Tin;
typedef TuplePtr<Tuple<int>> Tout;
......@@ -205,7 +205,7 @@ The following example calculates the number of already processed tuples and a su
defined, containing the mentioned tuple count and sum. After reading and extracting tuples from file, the state is updated everytime through
the `statefulMap` operator, returning tuple count and sum to next operators.
```
```C++
typedef TuplePtr<Tuple<int,int,int>> Tin;
typedef TuplePtr<Tuple<int, int>> Tout;
......@@ -247,7 +247,7 @@ This operator saves incoming tuples into a file named `fname`. `ffun` is an opti
The following example reads tuples from "data.csv", selecting only tuples whose first attribute is even, and saves them to "resultFile.txt".
```
```C++
typedef TuplePtr<Tuple<int,int,int>> Tin;
Topology t;
......@@ -278,7 +278,7 @@ The second version of the `keyBy` operator uses the N-th column as the key. No s
The following two examples show the use of the `keyBy` operator, doing the same (with different syntax). The key of incoming tuples is set
to first attribute (integer) and can now be used for following functions, like a join or grouping.
```
```C++
typedef TuplePtr<Tuple<int,int,int>> Tin;
Topology t;
......@@ -326,14 +326,44 @@ auto s = t.newStreamFromFile("data.csv")
#### slidingWindow ####
`Pipe<T> Pipe::slidingWindow(wt, sz, ei)`
`Pipe<T> Pipe::slidingWindow(WindowParams::WinType wt, unsigned int sz, unsigned int ei)`
This operator defines a sliding window of the given type and size on the stream. The window type `wt` can be
row (count-based) or range (time-based) for which `sz` specifies the size (in tuples or milliseconds).
In case of a range (time-based) window the `assignTimestamps` operator has to defined before on the stream.
The optional parameter `ei` denotes the eviction interval, i.e. the time interval (in milliseconds) for
for triggering the eviction of tuples from the window.
In the following example a time-based sliding window of 60 seconds is created. The timestamp of the tuples is
taken from the first column.
defines a sliding window of the given type and size on the stream. The window type `w` can be row or range for which `sz` specifies the size.
In case of a range (time-based) window the `assignTimestamp` operator has to defined before.
```C++
Topology t;
auto s = s.createStreamFromFile()
.extract<T1>(',')
.assignTimestamps<0>()
.slidingWindow(WindowParams::RangeWindow, 6000)
...
```
#### tumblingWindow ####
`Pipe<T> Pipe::tumblingWindow(wt, sz)`
`Pipe<T> Pipe::tumblingWindow(WindowParams::WinType wt, unsigned int sz)`
The `tumblingWindow` operator creates a row or range-based tumbling window of the given size `sz`. In
contrast to a sliding window a tumbling window invalidates all tuples as soon as the window is completely
filled - either by its size (row) or time difference of the oldest and most recent tuple. As in
`slidingWindow` a range-based window requires to specify the timestamp column with `assignTimestamps`.
The following example code creates tumbling window that outdates the tuples after every 100 processed
tuples.
```C++
Topology t;
auto s = s.createStreamFromFile()
.extract<T1>(',')
.assignTimestamps<0>()
.tumblingWindow(WindowParams::RowWindow, 100)
...
```
#### queue ####
......@@ -412,7 +442,33 @@ auto s = t.newStreamFromFile("data.csv")
```
#### join ####
`Pipe<typename SHJoin<T, T2, KeyType>::ResultElement> Pipe::join(otherPipe, pred)`
`Pipe<typename SHJoin<T, T2, KeyType>::ResultElement> Pipe::join<KeyType, T2>(Pipe<T2>& otherPipe, std::function<bool (T&, T2&)> pred)`
The `join` operator implements a symmetric hash join for joining the current stream with a second stream
represented by `otherPipe`. The match of two tuples from both streams is determined by hashing the keys. Thus,
for both streams (topologies) the `keyBy` operator has to be used before to determine the join key. In addition,
a predicate `pred` for comparing the tuples has to be specified which could use only the keys or additional
columns.
The following example illustrates the usage of the join operator:
```C++
typedef TuplePtr<Tuple<int, int, double>> T1;
typedef TuplePtr<Tuple<int, int, std::string>> T2;
Topology t;
auto s1 = t.newStreamFromFile("file2.csv")
.extract<T2>(',')
.keyBy<int>([](auto tp) { return get<0>(tp); });
auto s2 = t.newStreamFromFile("file1.csv")
.extract<T1>(',')
.keyBy<int>([](auto tp) { return get<0>(tp); })
.join<int>(s1, [](auto tp1, auto tp2) { return get<0>(tp1) == get<0>(tp2) && get<1>(tp1) == get<1>(tp2); })
.print(strm);
```
#### notify ####
......@@ -496,7 +552,11 @@ auto s = t.newStreamFromFile("file.csv")
#### toStream ####
`Pipe<T> Pipe::toStream(stream)`
`Pipe<T> Pipe::toStream(Dataflow::BaseOpPtr stream)`
The `toStream` operator sends all tuples of the stream to the stream denoted by `stream`. This stream has
to be created first with the `createStream` method of the current context (from the `PFabricContext` class)
and has to be defined with the same schema (tuple type). See `fromStream` for an example.
#### sendZMQ ####
......@@ -512,7 +572,7 @@ auto s = t.newStreamFromFile("file.csv")
#### toTable ####
`Pipe<T> Pipe::toTable(tbl, autoCommit)`
`Pipe<T> Pipe::toTable(TablePtr tbl, bool autoCommit)`
This operator stores tuples from the input stream of type `T` with key type `K` into the table `tbl` and forwards them to its
subscribers. `TablePtr` is of type `std::shared_ptr<Table<T, K> >`. Outdated tuples are handled as deletes, non-outdated tuples
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment