Commit f3c6cc5b authored by Kai-Uwe Sattler's avatar Kai-Uwe Sattler
Browse files

MemorySource added

parent 6b1616e7
......@@ -38,6 +38,10 @@ void Topology::registerStartupFunction(StartupFunc func) {
startupList.push_back(func);
}
void Topology::registerPrepareFunction(StartupFunc func) {
prepareList.push_back(func);
}
void Topology::startAsync() {
// create futures for waiting for the results
// of the start functions
......@@ -59,6 +63,12 @@ void Topology::start(bool async) {
}
}
void Topology::prepare() {
for (auto pFunc : prepareList) {
(pFunc)();
}
}
void Topology::wait() {
if (!asyncStarted)
return;
......
......@@ -35,6 +35,7 @@
#include "qop/TextFileSource.hpp"
#include "qop/RESTSource.hpp"
#include "qop/ZMQSource.hpp"
#include "qop/MemorySource.hpp"
#include "qop/ToTable.hpp"
#include "qop/FromTable.hpp"
#include "qop/SelectFromTable.hpp"
......@@ -79,6 +80,7 @@ namespace pfabric {
// std::list<Pipe*> pipes; //< the list of pipes created for this topology
std::vector<StartupFunc> startupList; //< the list of functions to be called for startup
std::vector<StartupFunc> prepareList; //< the list of functions to be called for startup
bool asyncStarted; //< true if we started asynchronously
std::vector<std::future<unsigned long> > startupFutures; //< futures for the startup functions
std::mutex mMutex; //< mutex for accessing startupFutures
......@@ -98,6 +100,8 @@ namespace pfabric {
*/
void registerStartupFunction(StartupFunc func);
void registerPrepareFunction(StartupFunc func);
/**
* @brief Invokes the start functions asynchronously.
*/
......@@ -129,6 +133,8 @@ namespace pfabric {
*/
void start(bool async = true);
void prepare();
/**
* @brief Waits until the execution of the topology stopped.
*
......@@ -290,6 +296,14 @@ namespace pfabric {
registerStartupFunction([=]() -> unsigned long { return op->start(); });
return Pipe<T>(dataflow, dataflow->addPublisher(op));
}
template<typename T>
Pipe<T> newStreamFromMemory(const std::string& fname, char delim = ',', unsigned long num = 0) {
auto op = std::make_shared<MemorySource<T>>(fname, delim, num);
registerStartupFunction([=]() -> unsigned long { return op->start(); });
registerPrepareFunction([=]() -> unsigned long { return op->prepare(); });
return Pipe<T>(dataflow, dataflow->addPublisher(op));
}
};
}
......
/*
* Copyright (c) 2014-17 The PipeFabric team,
* All Rights Reserved.
*
* This file is part of the PipeFabric package.
*
* PipeFabric is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License (GPL) as
* published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; see the file LICENSE.
* If not you can find the GPL at http://www.gnu.org/copyleft/gpl.html
*/
#ifndef MemorySource_hpp_
#define MemorySource_hpp_
#include <fstream>
#include <iostream>
#include <memory>
#include "core/Punctuation.hpp"
#include "core/Tuple.hpp"
#include "pubsub/channels/ConnectChannels.hpp"
#include "qop/BaseOp.hpp"
#include "qop/DataSource.hpp"
#include "qop/Map.hpp"
#include "qop/Notify.hpp"
#include "qop/OperatorMacros.hpp"
#include "qop/TextFileSource.hpp"
#include "qop/TupleExtractor.hpp"
namespace pfabric {
/**
* @brief TextFileSource is a source operator for reading a text file line by
* line and producing a stream of tuples.
*
* A FileSource is an operator producing a stream of tuples which are extracted
* from a file.
* We assume a simple text file where a record is represented by a separate
* line. The operator produces a stream of @c TStringPtr elements.
*/
template <typename StreamElement>
class MemorySource : public DataSource<StreamElement> {
public:
PFABRIC_SOURCE_TYPEDEFS(StreamElement)
/**
* Creates a new FileSource implementation object for reading data from a file
* and producing tuples.
*
* @param fname the name of the file we read the data from
*/
MemorySource(const std::string& fname, char delim = ',',
unsigned long limit = 0) {
if (limit > 0) data.reserve(limit);
fileSource = std::make_shared<TextFileSource>(fname, limit);
extractor = std::make_shared<TupleExtractor<StreamElement>>(delim);
CREATE_LINK(fileSource, extractor);
notify =
std::make_shared<Notify<StreamElement>>([&](auto tp, bool outdated) {
data.push_back(tp);
});
CREATE_LINK(extractor, notify);
}
/**
* Deallocates all resources.
*/
~MemorySource() {}
unsigned long prepare() { return fileSource->start(); }
/**
* Performs the actual processing by reading the file, parsing the input
* tuples and send
* the to the subscribers. This method has to be invoked explicitly.
*
* @return the number of tuples produced
*/
unsigned long start() {
for (auto tp : data) {
this->getOutputDataChannel().publish(tp, false);
}
this->getOutputPunctuationChannel().publish(
std::make_shared<Punctuation>(Punctuation::EndOfStream));
return data.size();
}
const std::string opName() const override {
return std::string("MemorySource");
}
private:
std::shared_ptr<TextFileSource> fileSource;
std::shared_ptr<TupleExtractor<StreamElement>> extractor;
std::shared_ptr<Notify<StreamElement>> notify;
std::vector<StreamElement> data;
};
}
#endif
......@@ -15,6 +15,7 @@ if (BUILD_TEST_CASES)
do_test(SelectChannelParametersTest)
do_test(ChannelGroupTest)
do_test(TextFileSourceTest)
do_test(MemorySourceTest)
do_test(RESTSourceTest)
do_test(MapTest)
do_test(WhereTest)
......@@ -43,7 +44,7 @@ if (BUILD_TEST_CASES)
do_test(BarrierTest)
do_test(StreamGeneratorTest)
do_test(TuplifierTest)
do_test(BPTreeTest)
# do_test(BPTreeTest)
if (USE_ROCKSDB_TABLE)
do_test(RocksDBTest)
......
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
#include <boost/filesystem.hpp>
#include <iostream>
#include "catch.hpp"
#include "fmt/format.h"
#include "core/Tuple.hpp"
#include "qop/MemorySource.hpp"
#include "qop/DataSink.hpp"
#include "qop/OperatorMacros.hpp"
#include "TestDataGenerator.hpp"
using namespace pfabric;
template <typename StreamElement>
class TestConsumer : public SynchronizedDataSink<StreamElement> {
public:
PFABRIC_SYNC_SINK_TYPEDEFS(StreamElement)
TestConsumer() : tupleNum(0) {}
BIND_INPUT_CHANNEL_DEFAULT(InputDataChannel, TestConsumer, processDataElement);
BIND_INPUT_CHANNEL_DEFAULT(InputPunctuationChannel, TestConsumer, processPunctuation);
void processPunctuation(const PunctuationPtr& punctuation) {}
void processDataElement( const StreamElement& data, const bool outdated ) {
auto expected = makeTuplePtr(tupleNum, std::string("This is a string field"), (double)(tupleNum) * 100.0 + 0.5);
REQUIRE(*data == *expected);
tupleNum++;
}
unsigned long numTuples() const { return tupleNum; }
private:
int tupleNum;
};
typedef TuplePtr<int, std::string, double> MyTuple;
TEST_CASE("Preparing a MemorySource from a file", "[MemorySource]" ) {
// create a file of 1000 tuples (one tuple per line)
TestDataGenerator tData("test.csv");
tData.writeData(10000);
auto memSource = std::make_shared<MemorySource<MyTuple>>("test.csv");
auto ntuples = memSource->prepare();
REQUIRE(ntuples == 10000);
auto consumer = std::make_shared<TestConsumer<MyTuple>>();
CREATE_LINK(memSource, consumer);
memSource->start();
REQUIRE(consumer->numTuples() == 10000);
}
TEST_CASE("Preparing a MemorySource from a file with limit", "[MemorySource]" ) {
// create a file of 1000 tuples (one tuple per line)
TestDataGenerator tData("test.csv");
tData.writeData(10000);
auto memSource = std::make_shared<MemorySource<MyTuple>>("test.csv", ',', 100);
auto ntuples = memSource->prepare();
REQUIRE(ntuples == 100);
auto consumer = std::make_shared<TestConsumer<MyTuple>>();
CREATE_LINK(memSource, consumer);
memSource->start();
REQUIRE(consumer->numTuples() == 100);
}
......@@ -34,8 +34,7 @@ public:
std::ofstream ofs(fileName);
for (int i = 0; i < ntuples; i++) {
ofs << fmt::format("{},This is a string field,{}\n", i, i * 100 + 0.5);
// i << "," << "This is a string field" << "," << i * 100 + 0.5 << '\n';
ofs << fmt::format("{},This is a string field,{:.1f}\n", i, (double)(i * 100 + 0.5));
}
ofs.close();
#ifdef COMPRESSED_FILE_SOURCE
......
......@@ -214,6 +214,25 @@ TEST_CASE("Building and running a topology with stream generator", "[StreamGener
testTable->drop();
}
TEST_CASE("Building and running a topology with a memory source", "[MemorySource]") {
typedef TuplePtr<int, std::string, double> T1;
std::vector<T1> results;
TestDataGenerator tgen("file.csv");
tgen.writeData(10);
Topology t;
auto s = t.newStreamFromMemory<T1>("file.csv")
.notify([&](auto tp, bool outdated) {
results.push_back(tp);
});
t.prepare();
t.start(false);
REQUIRE(results.size() == 10);
}
TEST_CASE("Building and running a topology with grouping", "[GroupBy]") {
typedef TuplePtr<int, double> T1;
typedef TuplePtr<double> T2;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment