Commit 3427c738 authored by Kai-Uwe Sattler's avatar Kai-Uwe Sattler
Browse files

changed TuplePtr to use Tuple implictly - no more need to define TuplePtr<Tuple<...>>

parent 03948596
......@@ -68,7 +68,7 @@ For every update on `testTable` a tuple with structure `T1` and keytype `long` (
following operators.
```C++
typedef TuplePtr<Tuple<long, std::string, double>> T1;
typedef TuplePtr<long, std::string, double> T1;
Topology t;
auto s = t.newStreamFromTable<T1, long>(testTable)
......@@ -86,7 +86,7 @@ The following example registers a new named stream object of type `T1` first. Th
stream by an already existing topology. Finally, the stream is then used in `fromStream` in another topology.
```C++
typedef TuplePtr<Tuple<long, std::string, double>> T1;
typedef TuplePtr<long, std::string, double> T1;
PFabricContext ctx;
Dataflow::BaseOpPtr myStream = ctx.createStream<T1>("streamName");
......@@ -111,7 +111,7 @@ both numbers by one for all following tuples, described in a lambda function. `s
forwarding them to the next operators.
```C++
typedef TuplePtr<Tuple<int, int>> T1;
typedef TuplePtr<int, int> T1;
StreamGenerator<T1>::Generator gen ([](unsigned long n) -> T1 {
return makeTuplePtr((int)n, (int)n + 10);
......@@ -132,7 +132,7 @@ This operator processes a stream of strings, splits each tuple using the given s
The following example reads tuples from a file called "data.csv" and extracts tuples out of it, consisting out of three integer attributes.
```
typedef TuplePtr<Tuple<int,int,int>> Tin;
typedef TuplePtr<int,int,int> Tin;
Topology t;
auto s = t.newStreamFromFile("data.csv")
......@@ -149,7 +149,7 @@ The following example reads JSON strings from REST source and extracts them into
(data).
```C++
typedef TuplePtr<Tuple<int, double>> Tin;
typedef TuplePtr<int, double> Tin;
auto s = t.newStreamFromREST(8099, "^/publish$", RESTSource::POST_METHOD)
.extractJson<Tin>({"key", "data"})
......@@ -166,7 +166,7 @@ The following example reads again tuples from a file called "data.csv". After ex
(mod 2) are dropped.
```C++
typedef TuplePtr<Tuple<int,int,int>> Tin;
typedef TuplePtr<int,int,int> Tin;
Topology t;
auto s = t.newStreamFromFile("data.csv")
......@@ -185,8 +185,8 @@ The following example reads tuples again from "data.csv". Each tuple consists ou
tuple only consists out of one integer attribute (the second attribute from inserted tuple).
```C++
typedef TuplePtr<Tuple<int,int,int>> Tin;
typedef TuplePtr<Tuple<int>> Tout;
typedef TuplePtr<int,int,int> Tin;
typedef TuplePtr<int> Tout;
Topology t;
auto s = t.newStreamFromFile("data.csv")
......@@ -206,8 +206,8 @@ defined, containing the mentioned tuple count and sum. After reading and extract
the `statefulMap` operator, returning tuple count and sum to next operators.
```C++
typedef TuplePtr<Tuple<int,int,int>> Tin;
typedef TuplePtr<Tuple<int, int>> Tout;
typedef TuplePtr<int,int,int> Tin;
typedef TuplePtr<int, int> Tout;
struct MyState {
MyState() : cnt(0), sum(0) {}
......@@ -231,8 +231,8 @@ This operator prints each tuple to the stream `s` where the default value for `s
The following example prints all tuples (three integer attributes each) from file to console (per std::cout).
```
typedef TuplePtr<Tuple<int,int,int>> Tin;
```C++
typedef TuplePtr<int,int,int> Tin;
Topology t;
auto s = t.newStreamFromFile("data.csv")
......@@ -248,7 +248,7 @@ This operator saves incoming tuples into a file named `fname`. `ffun` is an opti
The following example reads tuples from "data.csv", selecting only tuples whose first attribute is even, and saves them to "resultFile.txt".
```C++
typedef TuplePtr<Tuple<int,int,int>> Tin;
typedef TuplePtr<int,int,int> Tin;
Topology t;
auto s = t.newStreamFromFile("data.csv")
......@@ -279,7 +279,7 @@ The following two examples show the use of the `keyBy` operator, doing the same
to first attribute (integer) and can now be used for following functions, like a join or grouping.
```C++
typedef TuplePtr<Tuple<int,int,int>> Tin;
typedef TuplePtr<int,int,int> Tin;
Topology t;
auto s = t.newStreamFromFile("data.csv")
......@@ -287,8 +287,8 @@ auto s = t.newStreamFromFile("data.csv")
.keyBy<int>([](auto tp) { return get<0>(tp); })
```
```
typedef TuplePtr<Tuple<int,int,int>> Tin;
```C++
typedef TuplePtr<int,int,int> Tin;
Topology t;
auto s = t.newStreamFromFile("data.csv")
......@@ -307,7 +307,7 @@ a function to calculate the corresponding timestamp. The second method uses a sp
The following two examples show the usage of this operator (with same results on different syntax), both setting the second attribute as timestamp.
```C++
typedef TuplePtr<Tuple<int,std::string,int>> Tin;
typedef TuplePtr<int,std::string,int> Tin;
Topology t;
auto s = t.newStreamFromFile("data.csv")
......@@ -316,7 +316,7 @@ auto s = t.newStreamFromFile("data.csv")
```
```C++
typedef TuplePtr<Tuple<int,std::string,int>> Tin;
typedef TuplePtr<int,std::string,int> Tin;
Topology t;
auto s = t.newStreamFromFile("data.csv")
......@@ -386,7 +386,7 @@ example creates a state class for computing the average, the count, and the sum
of `int` values:
```C++
typedef TuplePtr<Tuple<int>> InTuple;
typedef TuplePtr<int> InTuple;
typedef Aggregator3<
InTuple, // the input type for the aggregation
AggrAvg<int, int>,// an aggregate for calculating the average (input column type = int, output type = int)
......@@ -429,8 +429,8 @@ In order to specify the key for grouping the `keyBy` operator is needed. Note, t
class to store the grouping value in the aggregator class.
```C++
typedef TuplePtr<Tuple<int, int, int>> Tin;
typedef TuplePtr<Tuple<int, int> > AggrRes; // group_id, sum(col1), but not needed here!
typedef TuplePtr<int, int, int> Tin;
typedef TuplePtr<int, int> AggrRes; // group_id, sum(col1), but not needed here!
typedef Aggregator2<AggrRes, AggrIdentity<int>, 0, AggrSum<int>, 1> AggrState;
Topology t;
......@@ -453,8 +453,8 @@ The following example illustrates the usage of the join operator:
```C++
typedef TuplePtr<Tuple<int, int, double>> T1;
typedef TuplePtr<Tuple<int, int, std::string>> T2;
typedef TuplePtr<int, int, double> T1;
typedef TuplePtr<int, int, std::string> T2;
Topology t;
......@@ -479,7 +479,7 @@ tuple. Note that the tuple cannot be modified for the stream (use `map` instead)
conditions if you modify a global state in the callback functions.
```C++
typedef TuplePtr<Tuple<int,std::string,int>> Tin;
typedef TuplePtr<int,std::string,int> Tin;
Topology t;
auto s = t.newStreamFromFile("data.csv")
......@@ -536,7 +536,7 @@ that is forwarded to the subscribers as a single tuple. A batch is defined as fo
```C++
template <typename T>
using BatchPtr = TuplePtr<Tuple<std::vector<std::pair<T, bool>>>>;
using BatchPtr = TuplePtr<std::vector<std::pair<T, bool>>>;
```
where `T` denotes the type of the input tuple and the pairs represent the tuple as well as its outdated flag.
......
......@@ -29,7 +29,7 @@ using namespace pfabric;
BUILDER_CLASS(Query_2)
typedef TuplePtr<Tuple<int, double>> Tuple_1_Type_;
typedef TuplePtr<int, double> Tuple_1_Type_;
PFabricContext::TopologyPtr Query_2::create(PFabricContext& ctx) {
auto SENSOR_DATA = ctx.getTable<Tuple_1_Type_::element_type, int>("SENSOR_DATA");
......
......@@ -15,10 +15,10 @@ Next, we define the schema: the tuple types for representing input and output da
```C++
// the structure of tuples we receive via REST
typedef TuplePtr<Tuple<int, double> > InTuplePtr;
typedef TuplePtr<int, double> InTuplePtr;
// the structure of our output (aggregate) tuples
typedef TuplePtr<Tuple<double> > ResultTuplePtr;
typedef TuplePtr<double> ResultTuplePtr;
```
And for the aggregation we have to define a type the captures the aggregation state.
......
......@@ -4,11 +4,12 @@ The main data structure for representing elements of a data stream is the `Tuple
represents a template class which can parametrized with the attribute types of the element. Note,
that timestamps are not represented separately: timestamps can be derived from any attribute (or
a combination of attributes). Furthermore, tuples are not copied around but only passed by reference.
For this purpose, the `TuplePtr<>` template is used. Thus, a complete schema definition for a stream
For this purpose, usually the `TuplePtr<>` template is used instead of `Tuple` which simply wraps
a tuple with an intrusive pointer. Thus, a complete schema definition for a stream
looks like the following:
```C++
typedef TuplePtr<Tuple<int, std::string, double> > T1;
typedef TuplePtr<int, std::string, double> T1;
```
Tuples can be constructed using the `makeTuplePtr` function which of course requires correctly
......@@ -31,8 +32,8 @@ which allows to specify processing steps in a DSL very similar to Apache Spark.
code snippet gives an example. See below for an explanation of the provided operators.
```C++
typedef TuplePtr<Tuple<int, std::string, double> > T1;
typedef TuplePtr<Tuple<double, int> > T2;
typedef TuplePtr<int, std::string, double> T1;
typedef TuplePtr<double, int> T2;
Topology t;
auto s = t.newStreamFromFile("file.csv")
......
......@@ -54,8 +54,8 @@ using namespace ns_types;
*/
void TopologyMapWhereTest(benchmark::State& state) {
typedef TuplePtr<Tuple<int, std::string, double> > T1;
typedef TuplePtr<Tuple<double, int> > T2;
typedef TuplePtr<int, std::string, double> T1;
typedef TuplePtr<double, int> T2;
TestDataGenerator tgen("file.csv");
tgen.writeData(1000);
......@@ -86,8 +86,8 @@ BENCHMARK(TopologyMapWhereTest);
*/
void TopologyWhereMapTest(benchmark::State& state) {
typedef TuplePtr<Tuple<int, std::string, double> > T1;
typedef TuplePtr<Tuple<double, int> > T2;
typedef TuplePtr<int, std::string, double> T1;
typedef TuplePtr<double, int> T2;
TestDataGenerator tgen("file.csv");
tgen.writeData(1000);
......@@ -115,8 +115,8 @@ BENCHMARK(TopologyWhereMapTest);
*/
void TopologyPartitionedWhereBeforeMapTest(benchmark::State& state) {
typedef TuplePtr<Tuple<int, std::string, double> > T1;
typedef TuplePtr<Tuple<double, int> > T2;
typedef TuplePtr<int, std::string, double> T1;
typedef TuplePtr<double, int> T2;
TestDataGenerator tgen("file.csv");
tgen.writeData(1000);
......@@ -163,8 +163,8 @@ double doMath(double input) {
*/
void TopologyGroupByTest(benchmark::State& state) {
typedef TuplePtr<Tuple<int, std::string, double> > T1;
typedef TuplePtr<Tuple<double> > T2;
typedef TuplePtr<int, std::string, double> T1;
typedef TuplePtr<double> T2;
typedef Aggregator1<T1, AggrSum<double>, 2> AggrStateSum;
TestDataGenerator tgen("file.csv");
......@@ -194,8 +194,8 @@ BENCHMARK(TopologyGroupByTest);
*/
void TopologyPartitionedGroupByTest(benchmark::State& state) {
typedef TuplePtr<Tuple<int, std::string, double> > T1;
typedef TuplePtr<Tuple<double> > T2;
typedef TuplePtr<int, std::string, double> T1;
typedef TuplePtr<double> T2;
typedef Aggregator1<T1, AggrSum<double>, 2> AggrStateSum;
TestDataGenerator tgen("file.csv");
......
......@@ -68,10 +68,14 @@ class Punctuation;
typedef std::shared_ptr< Punctuation > PunctuationPtr;
#if 0
/// a pointer to a generic tuple
template< typename TupleType >
using TuplePtr = boost::intrusive_ptr< TupleType >;
#endif
template< typename T >
using SmartPtr = boost::intrusive_ptr< T >;
class BaseAggregateState;
......
......@@ -348,6 +348,9 @@ private:
std::bitset<std::tuple_size<Base>::value > mNulls; //< a bitset where a bit indicates that the corresponding field contains a null value
};
template< typename ...Types >
using TuplePtr = boost::intrusive_ptr<Tuple<Types...> >;
} // namespace pfabric
......@@ -384,8 +387,8 @@ std::ostream& operator<< (std::ostream& os, const pfabric::Tuple<Types ...>& tp)
* @param[in] tp
* @return
*/
template<typename TupleType>
std::ostream& operator<< (std::ostream& os, const pfabric::TuplePtr< TupleType >& tp) {
template<typename ...Types>
std::ostream& operator<< (std::ostream& os, const pfabric::TuplePtr<Types ...>& tp) {
os << *tp;
return os;
}
......
......@@ -48,7 +48,7 @@ public:
*/
template< typename... Types >
struct getElementType {
typedef TuplePtr< Tuple< typename std::decay< Types >::type... > > type;
typedef TuplePtr< typename std::decay< Types >::type... > type;
};
/**
......@@ -66,7 +66,7 @@ public:
template< typename... Args >
static typename getElementType< Args... >::type create( Args&&... args ) {
typedef Tuple< typename std::decay< Args >::type... > ResultTuple;
return TuplePtr< ResultTuple >( new ResultTuple( std::forward< Args >(args)...) );
return boost::intrusive_ptr< ResultTuple >( new ResultTuple( std::forward< Args >(args)...) );
}
};
......
......@@ -5,10 +5,11 @@
using namespace pfabric;
// the structure of tuples we receive via REST
typedef TuplePtr<Tuple<int, double> > InTuplePtr;
// typedef TuplePtr<Tuple<int, double> > InTuplePtr;
typedef TuplePtr<int, double> InTuplePtr;
// the structure of aggregate tuples
typedef TuplePtr<Tuple<double> > ResultTuplePtr;
// typedef TuplePtr<Tuple<double> > ResultTuplePtr;
// the aggregate operator needs a state object that is defined here:
// template parameters are: the input type,
......
......@@ -13,7 +13,7 @@ using namespace pfabric;
namespace po = boost::program_options;
typedef TuplePtr<Tuple<int, double> > InTuplePtr;
typedef TuplePtr<int, double> InTuplePtr;
PFabricContext::TopologyPtr createStreamQuery(PFabricContext& ctx) {
auto myTable = ctx.getTable<InTuplePtr::element_type, int>("SENSOR_DATA");
......
......@@ -120,7 +120,7 @@ public:
/**
* The tuple type representing the aggregation result.
*/
typedef TuplePtr<Tuple<typename Aggr1Func::ResultType>> ResultTypePtr;
typedef TuplePtr<typename Aggr1Func::ResultType> ResultTypePtr;
/**
* Typedef for a pointer to the aggregation state.
......@@ -196,8 +196,8 @@ public:
/**
* The tuple type representing the aggregation result.
*/
typedef TuplePtr<Tuple<typename Aggr1Func::ResultType,
typename Aggr2Func::ResultType>> ResultTypePtr;
typedef TuplePtr<typename Aggr1Func::ResultType,
typename Aggr2Func::ResultType> ResultTypePtr;
/**
* Typedef for a pointer to the aggregation state.
*/
......@@ -289,9 +289,9 @@ public:
/**
* The tuple type representing the aggregation result.
*/
typedef TuplePtr<Tuple<typename Aggr1Func::ResultType,
typename Aggr2Func::ResultType,
typename Aggr3Func::ResultType>> ResultTypePtr;
typedef TuplePtr<typename Aggr1Func::ResultType,
typename Aggr2Func::ResultType,
typename Aggr3Func::ResultType> ResultTypePtr;
/**
* Typedef for a pointer to the aggregation state.
......@@ -395,10 +395,10 @@ public:
/**
* The tuple type representing the aggregation result.
*/
typedef TuplePtr<Tuple<typename Aggr1Func::ResultType,
typename Aggr2Func::ResultType,
typename Aggr3Func::ResultType,
typename Aggr4Func::ResultType>> ResultTypePtr;
typedef TuplePtr<typename Aggr1Func::ResultType,
typename Aggr2Func::ResultType,
typename Aggr3Func::ResultType,
typename Aggr4Func::ResultType> ResultTypePtr;
/**
* Typedef for a pointer to the aggregation state.
......
......@@ -30,7 +30,7 @@
namespace pfabric {
template <typename InputStreamElement>
using BatchPtr = TuplePtr<Tuple<std::vector<std::pair<InputStreamElement, bool>>>>;
using BatchPtr = TuplePtr<std::vector<std::pair<InputStreamElement, bool>>>;
template <typename InputStreamElement>
class Batcher : public UnaryTransform< InputStreamElement, BatchPtr<InputStreamElement> > // use default unary transform
......
......@@ -35,7 +35,7 @@
namespace pfabric {
typedef Tuple<StringRef> TString; //< a tuple containing a line of text
typedef TuplePtr<TString> TStringPtr; //< tuple pointer
typedef TuplePtr<StringRef> TStringPtr; //< tuple pointer
/**
* @brief TextFileSource is a source operator for reading a text file line by
......
......@@ -46,7 +46,7 @@ namespace pfabric {
* Typedefs for a tuple containing only a byte array for serializing tuples.
*/
typedef Tuple<StreamType> TBuf;
typedef TuplePtr<TBuf> TBufPtr;
typedef TuplePtr<StreamType> TBufPtr;
/**
* ZMQSourceImpl provides the basic implementation of ZMQSource to
......
......@@ -64,8 +64,8 @@ public:
HashMapIterator operator++(int) { auto tmp = *this; ++(*this); return tmp; }
bool isValid() const { return i != end; }
TuplePtr<RecordType> operator*() {
return TuplePtr<RecordType> (new RecordType(i->second));
SmartPtr<RecordType> operator*() {
return SmartPtr<RecordType> (new RecordType(i->second));
}
// typename Iter::value_type::second_type* operator->() { return &i->second; }
......@@ -312,14 +312,14 @@ public:
* @param key the key value
* @return the tuple associated with the given key
*/
const TuplePtr<RecordType> getByKey(KeyType key) throw (TableException) {
const SmartPtr<RecordType> getByKey(KeyType key) throw (TableException) {
// make sure we have exclusive access
std::lock_guard<std::mutex> lock(mMtx);
auto res = mDataTable.find(key);
if (res != mDataTable.end()) {
// if we found the tuple we return a TuplePtr containing a copy of it
TuplePtr<RecordType> tptr (new RecordType(res->second));
SmartPtr<RecordType> tptr (new RecordType(res->second));
return tptr;
}
else
......
......@@ -79,7 +79,7 @@ class RDBTableIterator {
public:
typedef std::function<bool(const RecordType&)> Predicate;
typedef TuplePtr<RecordType> RecordTypePtr;
typedef SmartPtr<RecordType> RecordTypePtr;
explicit RDBTableIterator() {}
explicit RDBTableIterator(rocksdb::Iterator* i, Predicate p) : pred(p) {
......@@ -108,7 +108,7 @@ class RDBTableIterator {
bool isValid() const { return iter->Valid(); }
RecordTypePtr operator*() {
TuplePtr<RecordType> tptr;
SmartPtr<RecordType> tptr;
tptr.reset(pfabric::detail::sliceToTuplePtr<RecordType>(iter->value()));
return tptr;
}
......@@ -401,13 +401,13 @@ class RDBTable : public BaseTable {
* @param key the key value
* @return the tuple associated with the given key
*/
TuplePtr<RecordType> getByKey(KeyType key) throw(TableException) {
SmartPtr<RecordType> getByKey(KeyType key) throw(TableException) {
std::string resultData;
auto status =
db->Get(readOptions, pfabric::detail::valToSlice(key), &resultData);
if (status.ok()) {
// if we found the tuple we just return it
TuplePtr<RecordType> tptr;
SmartPtr<RecordType> tptr;
tptr.reset(pfabric::detail::sliceToTuplePtr<RecordType>(
rocksdb::Slice(resultData.data(), resultData.size())));
return tptr;
......
......@@ -6,7 +6,7 @@ using namespace pfabric;
std::string TableInfo::generateTypeDef() const {
std::ostringstream os;
bool first = true;
os << "TuplePtr<Tuple<";
os << "TuplePtr<";
for (auto& col : mColumns) {
if (first)
first = false;
......@@ -14,7 +14,7 @@ std::string TableInfo::generateTypeDef() const {
os << ", ";
os << col.mColType;
}
os << ">>";
os << ">";
return os.str();
}
......
......@@ -198,7 +198,7 @@ TEST_CASE("Test AggrIdentity function", "[AggregateFunc]") {
AggrIdentity<std::string> aggr2;
std::vector<std::string> data = { "aaa", "bbb", "ccc", "ddd", "eee" };
for (int i = 0; i < data.size(); i++)
for (unsigned i = 0; i < data.size(); i++)
aggr2.iterate(data[i]);
REQUIRE(aggr2.value() == "eee");
......
......@@ -24,13 +24,10 @@
using namespace pfabric;
typedef Tuple<double> InTuple;
typedef TuplePtr<InTuple> InTuplePtr;
typedef Tuple<double, double, int> OutTuple;
typedef TuplePtr<OutTuple> OutTuplePtr;
typedef TuplePtr<double> InTuplePtr;
typedef TuplePtr<double, double, int> OutTuplePtr;
typedef Tuple<double, double, double, double> Out2Tuple;
typedef TuplePtr<Out2Tuple> Out2TuplePtr;
typedef TuplePtr<double, double, double, double> Out2TuplePtr;
TEST_CASE( "Compute a simple aggregate on the entire stream", "[Aggregation]" ) {
typedef Aggregator3<InTuplePtr,
......
......@@ -19,8 +19,7 @@
using namespace pfabric;
typedef Tuple<int> MyTuple;
typedef TuplePtr<MyTuple> MyTuplePtr;
typedef TuplePtr<int> MyTuplePtr;
struct BarrierCounter {
std::atomic<int> counter;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment