Topology.cpp 5.37 KB
Newer Older
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * Copyright (c) 2014-16 The PipeFabric team,
 *                       All Rights Reserved.
 *
 * This file is part of the PipeFabric package.
 *
 * PipeFabric is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License (GPL) as
 * published by the Free Software Foundation; either version 2 of
 * the License, or (at your option) any later version.
 *
 * This package is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; see the file LICENSE.
 * If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
 */
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
21

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
22
23
24
25
26
27
28
29

#include "Topology.hpp"
#include "qop/ZMQSource.hpp"

using namespace pfabric;

Topology::~Topology() {
  // delete all pipes we have created
30
  /*
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
31
32
33
  for (auto i : pipes) {
    delete i;
  }
34
  */
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
35
36
37
38
39
40
}

void Topology::registerStartupFunction(StartupFunc func) {
  startupList.push_back(func);
}

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
41
42
43
44
void Topology::registerPrepareFunction(StartupFunc func) {
  prepareList.push_back(func);
}

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
45
46
47
void Topology::startAsync() {
  // create futures for waiting for the results
  // of the start functions
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
48
  std::lock_guard<std::mutex> guard(mMutex);
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
49
50
  for (auto sFunc : startupList) {
    // make sure the function is launched asynchronously in a separate thread
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
51
    startupFutures.push_back(std::async(std::launch::async, sFunc));
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
52
  }
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
53
54
  asyncStarted = true;

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
55
56
57
58
59
60
61
62
63
64
65
}

void Topology::start(bool async) {
  if (async)
    startAsync();
  else
    for (auto sFunc : startupList) {
      (sFunc)();
    }
}

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
66
67
68
69
70
71
void Topology::prepare() {
    for (auto pFunc : prepareList) {
      (pFunc)();
    }
}

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
72
73
74
75
76
77
78
79
80
81
void Topology::wait() {
    if (!asyncStarted)
      return;

    std::lock_guard<std::mutex> guard(mMutex);
    // let's wait until the function finished
    for(auto &f : startupFutures)
      f.get();
}

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
82
83
84
85
86
87
88
89
90
void Topology::runEvery(const std::chrono::seconds& secs) {
  wakeupTimers.push_back(std::thread([&](){
        while(true) {
          std::this_thread::sleep_for(secs);
          startAsync();
        }
  }));
}
    
91
Pipe<TStringPtr> Topology::newStreamFromFile(const std::string& fname, unsigned long limit) {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
92
  // create a new TextFileSource
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
93
  auto op = std::make_shared<TextFileSource>(fname, limit);
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
94
95
96
97
  // register it's start function
  registerStartupFunction(std::bind(&TextFileSource::start, op.get()));
  // and create a new pipe; we use a raw pointer here because
  // we want to return a reference to a Pipe object
98
  return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
99
100
}

101
102
#ifdef USE_RABBITMQ
Pipe<TStringPtr> Topology::newStreamFromRabbitMQ(const std::string& info, const std::string& queueName) {
103
  // create a new RabbitMQSource
104
  auto op = std::make_shared<RabbitMQSource>(info, queueName);
105
106
107
108
109
110
111
112
  // register it's start function
  registerStartupFunction(std::bind(&RabbitMQSource::start, op.get()));
  // and create a new pipe; we use a raw pointer here because
  // we want to return a reference to a Pipe object
  return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
}
#endif

113
114
115
116
117
118
119
120
121
122
123
124
125
#ifdef USE_KAFKA
Pipe<TStringPtr> Topology::newStreamFromKafka(const std::string& broker, const std::string& topic,
                                              const std::string& groupID) {
  // create a new KafkaSource
  auto op = std::make_shared<KafkaSource>(broker, topic, groupID);
  // register it's start function
  registerStartupFunction(std::bind(&KafkaSource::start, op.get()));
  // and create a new pipe; we use a raw pointer here because
  // we want to return a reference to a Pipe object
  return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
}
#endif

Constantin Pohl's avatar
Constantin Pohl committed
126
127
128
129
130
131
132
133
134
135
136
137
#ifdef USE_MQTT
Pipe<TStringPtr> Topology::newStreamFromMQTT(const std::string& conn, const std::string& channel) {
  // create a new MQTTSource
  auto op = std::make_shared<MQTTSource>(conn, channel);
  // register it's start function
  registerStartupFunction(std::bind(&MQTTSource::start, op.get()));
  // and create a new pipe; we use a raw pointer here because
  // we want to return a reference to a Pipe object
  return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
}
#endif

138
Pipe<TStringPtr> Topology::newStreamFromREST(unsigned int port,
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
139
140
141
                                  const std::string& path,
                                  RESTSource::RESTMethod method,
                                  unsigned short numThreads) {
142
  // create a new RESTSource
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
143
144
145
146
147
  auto op = std::make_shared<RESTSource>(port, path, method, numThreads);
  // register it's start function
  registerStartupFunction(std::bind(&RESTSource::start, op.get()));
  // and create a new pipe; we use a raw pointer here because
  // we want to return a reference to a Pipe object
148
  return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
149
150
}

151
Pipe<TStringPtr> Topology::newAsciiStreamFromZMQ(const std::string& path,
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
152
                                 ZMQParams::SourceType stype) {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
153
    auto op = std::make_shared<ZMQSource<TStringPtr> >(path, stype);
154
155
156
157
158
    return Pipe<TStringPtr>(dataflow, dataflow->addPublisher(op));
}

Pipe<TBufPtr> Topology::newBinaryStreamFromZMQ(const std::string& path,
                                 ZMQParams::SourceType stype) {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
159
    auto op = std::make_shared<ZMQSource<TBufPtr> >(path, stype);
160
    return Pipe<TBufPtr>(dataflow, dataflow->addPublisher(op));
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
161
}