Topology.hpp 14.4 KB
Newer Older
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
1
/*
2
 * Copyright (c) 2014-17 The PipeFabric team,
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 *                       All Rights Reserved.
 *
 * This file is part of the PipeFabric package.
 *
 * PipeFabric is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License (GPL) as
 * published by the Free Software Foundation; either version 2 of
 * the License, or (at your option) any later version.
 *
 * This package is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; see the file LICENSE.
 * If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
 */
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
21
22
23
24
25
26
#ifndef Topology_hpp_
#define Topology_hpp_

#include <string>
#include <list>
#include <vector>
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
27
28
#include <future>
#include <mutex>
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
29
#include <chrono>
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
30
31
32
33
34
35
36
37
38

#include "core/Tuple.hpp"

#include "qop/DataSink.hpp"
#include "qop/DataSource.hpp"
#include "qop/OperatorMacros.hpp"
#include "qop/TextFileSource.hpp"
#include "qop/RESTSource.hpp"
#include "qop/ZMQSource.hpp"
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
39
#include "qop/MemorySource.hpp"
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
40
41
#include "qop/ToTable.hpp"
#include "qop/FromTable.hpp"
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
42
#include "qop/SelectFromTable.hpp"
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
43
#include "qop/StreamGenerator.hpp"
44
#ifdef SUPPORT_MATRICES
45
  #include "qop/FromMatrix.hpp"
46
#endif
47
48
#include "dsl/Pipe.hpp"
#include "dsl/Dataflow.hpp"
49
50
51
52
53
#ifdef USE_RABBITMQ
  #include "net/RabbitMQSource.hpp"
#endif
#ifdef USE_KAFKA
  #include "net/KafkaSource.hpp"
54
#endif
Constantin Pohl's avatar
Constantin Pohl committed
55
56
57
#ifdef USE_MQTT
  #include "net/MQTTSource.hpp"
#endif
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
58
59
60
61
62
63
64
65
66
67
68
69
70
71

namespace pfabric {

  /**
   * @brief A topology represents a dataflow graph of operators.
   *
   * Topology is the main entry point for a stream processing query. It is used
   * to create pipes with data sources as publishers which can be used to connect
   * other stream operators.
   *
   * The following snippet shows an example of using the Topology class.
   *
   * @code
   * // T1 and T2 a typedefs of TuplePtr
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
72
73
74
   * TopologyPtr t = ctx.createTopology();
   *
   * auto s = t->newStreamFromFile("file.csv")
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
75
76
   *           .extract<T1>(',')
   *           .where<T1>([](auto tp, bool outdated) {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
77
   *                     return get<0>(tp) % 2 == 0;
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
78
   *            })
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
79
80
81
   *           .map<T1,T2>([](auto tp, bool) -> T2 {
   *                     return makeTuplePtr(get<2>(tp),
   *                                         get<0>(tp));
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
82
83
84
   *            })
   *           .print<T2>(strm);
   * // now, let's start the processing
85
   * t->start();
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
86
87
88
89
90
91
92
   * @endcode
   */
  class Topology {
  private:
    /// the signature of a startup function
    typedef std::function<unsigned long()> StartupFunc;

93
//    std::list<Pipe*> pipes;               //< the list of pipes created for this topology
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
94
    std::vector<StartupFunc> startupList; //< the list of functions to be called for startup
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
95
    std::vector<StartupFunc> prepareList; //< the list of functions to be called for startup
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
96
97
    bool asyncStarted;                    //< true if we started asynchronously
    std::vector<std::future<unsigned long> > startupFutures; //< futures for the startup functions
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
98
    std::vector<std::thread> wakeupTimers; //< threads for runEvery queries
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
99
    std::mutex mMutex;                    //< mutex for accessing startupFutures
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
100

101
102
    DataflowPtr dataflow;

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
103
104
105
106
107
108
109
110
111
112
113
114
115
    /**
     * @brief Registers a startup function for initiating the processing.
     *
     * Registers the given function as a startup function of an operator. This is
     * required for all query operators requiring an explicit invocation of a method.
     * A startup function is called and executed asynchronously after @c start
     * is invoked.
     *
     * @param[in] func
     *    a function pointer for the startup member function
     */
    void registerStartupFunction(StartupFunc func);

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
116
117
    void registerPrepareFunction(StartupFunc func);

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
118
119
120
121
122
123
124
125
126
    /**
     * @brief Invokes the start functions asynchronously.
     */
    void startAsync();

  public:
    /**
     * @brief Constructs a new empty topology.
     */
127
    Topology() : asyncStarted(false), dataflow(make_shared<Dataflow>()) {}
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148

    /**
     * @brief Destructor for topology.
     */
    ~Topology();

    /**
     * @brief Starts processing of the whole topology.
     *
     * Starts the processing of the topology by invoking the start
     * functions of all operators acting as data source. The start
     * functions can be called either synchronously, i.e. one start
     * function after another, or asynchronously where the functions
     * run in concurrent threads. In both cases, start returns only
     * after all functions are finished.
     *
     * @param[in] async
     *   determines if the start functions should be invoked asynchronously
     */
    void start(bool async = true);

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
149
150
    void prepare();

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
151
152
153
154
155
156
157
158
159
160
161
162
    /**
     * @brief Runs the topology periodically every @c secs seconds.
     *
     * Starts the processing of the topology every @c secs seconds. Note,
     * that the topology should be a finite query not a continuous stream
     * query.
     *
     * @param[in] secs
     *  the period of time between two invocations
     */ 
    void runEvery(const std::chrono::seconds& secs);

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
163
164
165
166
167
168
169
170
    /**
     * @brief Waits until the execution of the topology stopped.
     *
     * If the topology was started asynchronously the call of wait()
     * blocks until the execution stopped.
     */
    void wait();

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
171
172
173
174
175
176
177
178
179
    /**
     * @brief Creates a pipe from a TextFileSource as input.
     *
     * Creates a new pipe for reading tuples (containing only a
     * string field representing a line of the file) via a
     * TextFileSource.
     *
     * @param[in] fname
     *    the name of the file from which the tuples are read.
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
180
181
     * @param[in] limit
     *    maximum number of tuples to be read (default == 0 => read until EOF)
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
182
183
184
     * @return
     *    a new pipe where TextFileSource acts as a producer.
     */
185
    Pipe<TStringPtr> newStreamFromFile(const std::string& fname, unsigned long limit = 0);
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
186

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
    /**
     * @brief Creates a pipe from a REST source as input.
     *
     * Creates a new pipe for receiving tuples via a REST server.
     * Each call of the REST service produces a single tuple (consisting
     * of a single string).
     *
     * @param[in] port
     *    the TCP port for receiving REST calls
     * @param[in] path
     *    the local part (path) of the REST URI
     * @param[in] method
     *    the REST method for invoking the service (GET, PUT, POST)
     * @param[in] numThreads
     *    the number of threads to run the service
     * @return
     *    a new pipe where RESTSource acts as a producer.
     */
205
    Pipe<TStringPtr> newStreamFromREST(unsigned int port,
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
206
207
208
                            const std::string& path,
                            RESTSource::RESTMethod method,
                            unsigned short numThreads = 1);
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
209

210
#ifdef USE_RABBITMQ
211
212
213
214
215
    /**
     * @brief Creates a pipe from a RabbitMQ source as input.
     *
     * Creates a new pipe for receiving tuples via AMQP server (RabbitMQ).
     * It reads messages from the AMQP queue and forwards them as tuples
216
     * to the subscribers, as long as there are messages on the server.
217
218
219
220
     *
     * @param[in] info
     *    a string containing password, user, address and port of the server
     *    format: "password:user@address:port", e.g. "guest:guest@localhost:5672"
221
222
     * @param[in] queueName
     *    a string containing the name of the queue for exchanging tuples, e.g. "queue"
223
224
225
     * @return
     *    a new pipe where RabbitMQSource acts as a producer.
     */
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
    Pipe<TStringPtr> newStreamFromRabbitMQ(const std::string& info, const std::string& queueName);
#endif

#ifdef USE_KAFKA
    /**
     * @brief Creates a pipe from an Apache Kafka source as input.
     *
     * Creates a new pipe for receiving tuples via Apache Kafka protocol.
     *
     * @param[in] broker
     *    the node(s) where the Kafka server runs on,
     *    e.g. "127.0.0.1:9092" for localhost
     * @param[in] topic
     *    the topic where the data is stored (Kafka property)
     * @param[in] groupID
     *    the ID of the group the consumer belongs to
     * @return
     *    a new pipe where KafkaSource acts as a producer.
     */
    Pipe<TStringPtr> newStreamFromKafka(const std::string& broker, const std::string& topic,
                                        const std::string& groupID);
247
248
#endif

Constantin Pohl's avatar
Constantin Pohl committed
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
#ifdef USE_MQTT
    /**
     * @brief Creates a pipe from a MQTT source as input.
     *
     * Creates a new pipe for receiving tuples via MQTT.
     *
     * @param[in] conn
     *    server connection info, e.g. "tcp://localhost:1883"
     * @param[in] channel
     *    the name of the channel to listen on
     * @return
     *    a new pipe where MQTTSource acts as a producer.
     */
    Pipe<TStringPtr> newStreamFromMQTT(const std::string& conn, const std::string& channel);
#endif

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
    /**
     * @brief Creates a pipe from a ZMQ source as input.
     *
     * Creates a new pipe for receiving tuples via ZMQ and sent them
     * over the stream either as one string (tuple) per message or
     * binary encoded.
     *
     * @param[in] path
     *    the path describing the network connection endpoint, e.g.
     *    tcp://localhost:5678 for a TCP connection at port 5678
     * @param[in] encoding
     *    the encoding used for sending data over ZMQ (ASCII or binary)
     * @param[in] stype
     *    the communication type as provided by ZMQ (Subscriber for PubSub
     *    or Pull).
     * @return
     *    a new pipe where ZMQSource acts as a producer.
     */
283
284
285
286
    Pipe<TStringPtr> newAsciiStreamFromZMQ(const std::string& path,
      ZMQParams::SourceType stype = ZMQParams::SubscriberSource);

    Pipe<TBufPtr> newBinaryStreamFromZMQ(const std::string& path,
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
287
288
      ZMQParams::SourceType stype = ZMQParams::SubscriberSource);

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
    /**
     * @brief Creates a pipe for monitoring updates on a table.
     *
     * Creates a new pipe for producing a stream from updates on a
     * table. Each update creates a tuple sent to the stream. Updates
     * can either sent immediately (@c Immediate) or after the commit
     * of an transaction.
     *
     * @tparam T
     *    the record type of the table, usually a @c TuplePtr<Tuple<...> >
     * @tparam KeyType
     *    the data type of the key of the table
     * @param[in] tbl
     *    the table acting as the source for the stream.
     * @param[in] mode
     *    the monitoring mode (@c Immediate or @c OnCommit)
     * @return
     *    a new pipe where RESTSource acts as a producer.
     */
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
308
    template<typename T, typename KeyType = DefaultKeyType>
309
    Pipe<T> newStreamFromTable(std::shared_ptr<Table<typename T::element_type, KeyType>> tbl,
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
310
                             TableParams::NotificationMode mode = TableParams::Immediate) {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
311
      auto op = std::make_shared<FromTable<T, KeyType>>(tbl, mode);
312
      return Pipe<T>(dataflow, dataflow->addPublisher(op));
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
313
    }
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
314

315
#ifdef SUPPORT_MATRICES
316
    /**
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
     * @brief Create a pipe for stream from matrix
     * @tparam Matrix
     *   matrix type
     * @tparam Matrix::StreamElement
     *   record type of the matrix like @c TuplePtr< int, int, double >
     * @param[in] matrix
     *   the matrix is source of stream.
     * @return
     *   a new pipe with new stream.
     */
    template<typename Matrix>
    Pipe<typename Matrix::StreamElement> newStreamFromMatrix(std::shared_ptr<Matrix> matrix) {
      auto op = std::make_shared<FromMatrix< Matrix > >(matrix);
      return Pipe<typename Matrix::StreamElement>(dataflow, dataflow->addPublisher(op));
    }
#endif

      /**
335
336
337
338
339
340
341
342
343
     * @brief Create a new pipe where a named stream is used as input.
     *
     * @tparam T the type of the stream element
     * @param[in] stream
     *    the named stream object from which the pipe receives the data
     * @return
     *    a new pipe where the stream acts as the producer.
     */
    template <typename T>
344
    Pipe<T> fromStream(Dataflow::BaseOpPtr stream) throw (TopologyException) {
345
346
347
348
349
      // check whether stream is a Queue<T> operator
      auto pOp = dynamic_cast<Queue<T>*>(stream.get());
      if (pOp == nullptr) {
        throw TopologyException("Incompatible tuple type of stream object.");
      }
350
      return Pipe<T>(dataflow, dataflow->addPublisher(stream));
351
352
    }

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
    /**
     * @brief Create a SeletFromTable operator as data source.
     *
     * Create a new SelectFromTable operator that produces a stream of tuples
     * from the given table.
     *
     * @tparam T
     *    the record type of the table, usually a @c TuplePtr<Tuple<...> >
     * @tparam KeyType
     *    the data type of the key of the table
     * @param tbl
     *    the table that is read
     * @param pred
     *    an optional filter predicate
     * @return
     *    a new pipe where the table acts as the source
     */
    template<typename T, typename KeyType = DefaultKeyType>
371
    Pipe<T> selectFromTable(std::shared_ptr<Table<typename T::element_type, KeyType>> tbl,
372
        typename Table<typename T::element_type, KeyType>::Predicate pred = nullptr) {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
373
374
      auto op = std::make_shared<SelectFromTable<T, KeyType>>(tbl, pred);
      registerStartupFunction([=]() -> unsigned long { return op->start(); });
375
      return Pipe<T>(dataflow, dataflow->addPublisher(op));
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
376
    }
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392

    /**
     * @brief Create a StreamGenerator operator as data source.
     *
     * Create a new StreamGenerator operator that produces a stream of tuples
     * created using the given generator function.
     *
     * @tparam T the type of the stream element
     * @param gen
     *    a generator function for creating the tuples
     * @param num
     *    the number of tuples to be created
     * @return
     *    a new pipe where the generator acts as the source
     */
    template<typename T>
393
    Pipe<T> streamFromGenerator(typename StreamGenerator<T>::Generator gen, unsigned long num) {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
394
395
      auto op = std::make_shared<StreamGenerator<T>>(gen, num);
      registerStartupFunction([=]() -> unsigned long { return op->start(); });
396
      return Pipe<T>(dataflow, dataflow->addPublisher(op));
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
397
    }
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
398
399
400
401
402
403
404
405

    template<typename T>
    Pipe<T> newStreamFromMemory(const std::string& fname, char delim = ',', unsigned long num = 0) {
      auto op = std::make_shared<MemorySource<T>>(fname, delim, num);
      registerStartupFunction([=]() -> unsigned long { return op->start(); });
      registerPrepareFunction([=]() -> unsigned long { return op->prepare(); });
      return Pipe<T>(dataflow, dataflow->addPublisher(op));
    }
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
406
407
408
409
410
  };

}

#endif