Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
code
pfabric
Commits
36d53940
Commit
36d53940
authored
Dec 06, 2017
by
Philipp Götze
Browse files
Removed deprecated dynamic exceptions and replaced by noexcept(false)
parent
cf18d1ad
Changes
3
Hide whitespace changes
Inline
Side-by-side
src/dsl/PFabricContext.hpp
View file @
36d53940
...
...
@@ -93,7 +93,7 @@ public:
a pointer to the newly created table
*/
template
<
typename
RecordType
,
typename
KeyType
=
DefaultKeyType
>
std
::
shared_ptr
<
Table
<
RecordType
,
KeyType
>>
createTable
(
const
std
::
string
&
tblName
)
throw
(
TableException
)
{
std
::
shared_ptr
<
Table
<
RecordType
,
KeyType
>>
createTable
(
const
std
::
string
&
tblName
)
noexcept
(
false
)
{
// first we check whether the table exists already
auto
it
=
mTableSet
.
find
(
tblName
);
if
(
it
!=
mTableSet
.
end
())
...
...
@@ -106,7 +106,7 @@ public:
}
template
<
typename
RecordType
,
typename
KeyType
=
DefaultKeyType
>
std
::
shared_ptr
<
Table
<
RecordType
,
KeyType
>>
createTable
(
const
TableInfo
&
tblInfo
)
throw
(
TableException
)
{
std
::
shared_ptr
<
Table
<
RecordType
,
KeyType
>>
createTable
(
const
TableInfo
&
tblInfo
)
noexcept
(
false
)
{
// first we check whether the table exists already
auto
it
=
mTableSet
.
find
(
tblInfo
.
tableName
());
if
(
it
!=
mTableSet
.
end
())
...
...
src/dsl/Pipe.hpp
View file @
36d53940
...
...
@@ -147,7 +147,7 @@ class Pipe {
Dataflow
::
BaseOpIterator
getPublishers
()
{
return
tailIter
;
}
template
<
typename
SourceType
>
SourceType
*
castOperator
(
Dataflow
::
BaseOpPtr
opPtr
)
throw
(
TopologyException
)
{
SourceType
*
castOperator
(
Dataflow
::
BaseOpPtr
opPtr
)
noexcept
(
false
)
{
auto
pOp
=
dynamic_cast
<
SourceType
*>
(
opPtr
.
get
());
if
(
pOp
==
nullptr
)
{
throw
TopologyException
(
"Incompatible tuple types in Pipe."
);
...
...
@@ -156,7 +156,7 @@ class Pipe {
}
template
<
typename
SourceType
>
SourceType
*
castOperator
(
BaseOp
*
opPtr
)
throw
(
TopologyException
)
{
SourceType
*
castOperator
(
BaseOp
*
opPtr
)
noexcept
(
false
)
{
auto
pOp
=
dynamic_cast
<
SourceType
*>
(
opPtr
);
if
(
pOp
==
nullptr
)
{
throw
TopologyException
(
"Incompatible tuple types in Pipe."
);
...
...
@@ -165,8 +165,7 @@ class Pipe {
}
template
<
typename
Publisher
,
typename
SourceType
>
OpIterator
addPublisher
(
std
::
shared_ptr
<
Publisher
>
op
)
throw
(
TopologyException
)
{
OpIterator
addPublisher
(
std
::
shared_ptr
<
Publisher
>
op
)
noexcept
(
false
)
{
auto
pOp
=
castOperator
<
SourceType
>
(
getPublisher
());
CREATE_LINK
(
pOp
,
op
);
return
dataflow
->
addPublisher
(
op
);
...
...
@@ -175,7 +174,7 @@ class Pipe {
template
<
typename
T2
,
typename
KeyType
>
OpIterator
addPartitionedJoin
(
std
::
vector
<
std
::
shared_ptr
<
SHJoin
<
T
,
T2
,
KeyType
>>>&
opList
,
DataSource
<
T2
>*
otherOp
,
PartitioningState
otherPartitioningState
)
throw
(
TopologyException
)
{
PartitioningState
otherPartitioningState
)
noexcept
(
false
)
{
typedef
typename
std
::
shared_ptr
<
SHJoin
<
T
,
T2
,
KeyType
>>
JoinOpPtr
;
if
(
partitioningState
==
NoPartitioning
)
throw
TopologyException
(
"Missing partitionBy operator in topology."
);
...
...
@@ -240,7 +239,7 @@ class Pipe {
template
<
typename
Publisher
,
typename
StreamElement
>
OpIterator
addPartitionedPublisher
(
std
::
vector
<
std
::
shared_ptr
<
Publisher
>>&
opList
)
throw
(
TopologyException
)
{
opList
)
noexcept
(
false
)
{
if
(
partitioningState
==
NoPartitioning
)
throw
TopologyException
(
"Missing partitionBy operator in topology."
);
...
...
@@ -382,7 +381,7 @@ class Pipe {
*/
Pipe
<
T
>
slidingWindow
(
const
WindowParams
::
WinType
&
wt
,
const
unsigned
int
sz
,
typename
Window
<
T
>::
WindowOpFunc
windowFunc
=
nullptr
,
const
unsigned
int
ei
=
0
)
throw
(
TableException
)
{
const
unsigned
int
ei
=
0
)
noexcept
(
false
)
{
typedef
typename
Window
<
T
>::
TimestampExtractorFunc
ExtractorFunc
;
ExtractorFunc
fn
;
...
...
@@ -443,7 +442,7 @@ class Pipe {
*/
Pipe
<
T
>
tumblingWindow
(
const
WindowParams
::
WinType
&
wt
,
const
unsigned
int
sz
,
typename
Window
<
T
>::
WindowOpFunc
windowFunc
=
nullptr
)
throw
(
TableException
)
{
typename
Window
<
T
>::
WindowOpFunc
windowFunc
=
nullptr
)
noexcept
(
false
)
{
typedef
typename
Window
<
T
>::
TimestampExtractorFunc
ExtractorFunc
;
ExtractorFunc
fn
;
...
...
@@ -503,7 +502,7 @@ class Pipe {
Pipe
<
T
>
print
(
std
::
ostream
&
os
=
std
::
cout
,
typename
ConsoleWriter
<
T
>::
FormatterFunc
ffun
=
ConsoleWriter
<
T
>::
defaultFormatter
)
throw
(
TopologyException
)
{
ConsoleWriter
<
T
>::
defaultFormatter
)
noexcept
(
false
)
{
assert
(
partitioningState
==
NoPartitioning
);
auto
op
=
std
::
make_shared
<
ConsoleWriter
<
T
>>
(
os
,
ffun
);
auto
pOp
=
castOperator
<
DataSource
<
T
>>
(
getPublisher
());
...
...
@@ -532,7 +531,7 @@ class Pipe {
Pipe
<
T
>
saveToFile
(
const
std
::
string
&
fname
,
typename
FileWriter
<
T
>::
FormatterFunc
ffun
=
ConsoleWriter
<
T
>::
defaultFormatter
)
throw
(
TopologyException
)
{
ConsoleWriter
<
T
>::
defaultFormatter
)
noexcept
(
false
)
{
assert
(
partitioningState
==
NoPartitioning
);
auto
op
=
std
::
make_shared
<
FileWriter
<
T
>>
(
fname
,
ffun
);
auto
pOp
=
castOperator
<
DataSource
<
T
>>
(
getPublisher
());
...
...
@@ -563,7 +562,7 @@ class Pipe {
Pipe
<
T
>
sendZMQ
(
const
std
::
string
&
path
,
ZMQParams
::
SinkType
stype
=
ZMQParams
::
PublisherSink
,
ZMQParams
::
EncodingMode
mode
=
ZMQParams
::
BinaryMode
)
throw
(
TopologyException
)
{
ZMQParams
::
BinaryMode
)
noexcept
(
false
)
{
if
(
partitioningState
==
NoPartitioning
)
{
auto
op
=
std
::
make_shared
<
ZMQSink
<
T
>>
(
path
,
stype
,
mode
);
auto
pOp
=
castOperator
<
DataSource
<
T
>>
(
getPublisher
());
...
...
@@ -598,7 +597,7 @@ class Pipe {
* @return a new pipe
*/
template
<
class
Tout
>
Pipe
<
Tout
>
extract
(
char
sep
)
throw
(
TopologyException
)
{
Pipe
<
Tout
>
extract
(
char
sep
)
noexcept
(
false
)
{
if
(
partitioningState
==
NoPartitioning
)
{
auto
op
=
std
::
make_shared
<
TupleExtractor
<
Tout
>>
(
sep
);
auto
iter
=
...
...
@@ -629,8 +628,7 @@ class Pipe {
* @return a new pipe
*/
template
<
class
Tout
>
Pipe
<
Tout
>
extractJson
(
const
std
::
initializer_list
<
std
::
string
>&
keys
)
throw
(
TopologyException
)
{
Pipe
<
Tout
>
extractJson
(
const
std
::
initializer_list
<
std
::
string
>&
keys
)
noexcept
(
false
)
{
std
::
vector
<
std
::
string
>
keyList
(
keys
);
if
(
partitioningState
==
NoPartitioning
)
{
auto
op
=
std
::
make_shared
<
JsonExtractor
<
Tout
>>
(
keyList
);
...
...
@@ -651,7 +649,7 @@ class Pipe {
/**
* TODO
*/
Pipe
<
BatchPtr
<
T
>>
batch
(
std
::
size_t
bsize
)
throw
(
TopologyException
)
{
Pipe
<
BatchPtr
<
T
>>
batch
(
std
::
size_t
bsize
)
noexcept
(
false
)
{
auto
op
=
std
::
make_shared
<
Batcher
<
T
>>
(
bsize
);
auto
iter
=
addPublisher
<
Batcher
<
T
>
,
DataSource
<
T
>>
(
op
);
return
Pipe
<
BatchPtr
<
T
>>
(
dataflow
,
iter
,
keyExtractor
,
timestampExtractor
,
...
...
@@ -665,7 +663,7 @@ class Pipe {
* @return a new pipe
*/
template
<
class
Tout
>
Pipe
<
Tout
>
deserialize
()
throw
(
TopologyException
)
{
Pipe
<
Tout
>
deserialize
()
noexcept
(
false
)
{
if
(
partitioningState
==
NoPartitioning
)
{
auto
op
=
std
::
make_shared
<
TupleDeserializer
<
Tout
>>
();
auto
iter
=
...
...
@@ -700,8 +698,7 @@ class Pipe {
* value for the input tuple
* @return a new pipe
*/
Pipe
<
T
>
where
(
typename
Where
<
T
>::
PredicateFunc
func
)
throw
(
TopologyException
)
{
Pipe
<
T
>
where
(
typename
Where
<
T
>::
PredicateFunc
func
)
noexcept
(
false
)
{
if
(
partitioningState
==
NoPartitioning
)
{
auto
op
=
std
::
make_shared
<
Where
<
T
>>
(
func
);
auto
iter
=
addPublisher
<
Where
<
T
>
,
DataSource
<
T
>>
(
op
);
...
...
@@ -739,7 +736,7 @@ class Pipe {
*/
Pipe
<
T
>
notify
(
typename
Notify
<
T
>::
CallbackFunc
func
,
typename
Notify
<
T
>::
PunctuationCallbackFunc
pfunc
=
nullptr
)
throw
(
TopologyException
)
{
nullptr
)
noexcept
(
false
)
{
assert
(
partitioningState
==
NoPartitioning
);
auto
op
=
std
::
make_shared
<
Notify
<
T
>>
(
func
,
pfunc
);
...
...
@@ -760,7 +757,7 @@ class Pipe {
* the input tuple type (usually a TuplePtr) for the operator.
* @return a new pipe
*/
Pipe
<
T
>
queue
()
throw
(
TopologyException
)
{
Pipe
<
T
>
queue
()
noexcept
(
false
)
{
if
(
partitioningState
==
NoPartitioning
)
{
auto
op
=
std
::
make_shared
<
Queue
<
T
>>
();
auto
iter
=
addPublisher
<
Queue
<
T
>
,
DataSource
<
T
>>
(
op
);
...
...
@@ -790,7 +787,7 @@ class Pipe {
* the named stream object to which the tuples are sent
* @return a new pipe
*/
Pipe
<
T
>
toStream
(
Dataflow
::
BaseOpPtr
stream
)
throw
(
TopologyException
)
{
Pipe
<
T
>
toStream
(
Dataflow
::
BaseOpPtr
stream
)
noexcept
(
false
)
{
assert
(
partitioningState
==
NoPartitioning
);
auto
queueOp
=
castOperator
<
Queue
<
T
>>
(
stream
);
auto
pOp
=
castOperator
<
DataSource
<
T
>>
(
getPublisher
());
...
...
@@ -818,7 +815,7 @@ class Pipe {
* @return new pipe
*/
template
<
typename
Tout
>
Pipe
<
Tout
>
map
(
typename
Map
<
T
,
Tout
>::
MapFunc
func
)
throw
(
TopologyException
)
{
Pipe
<
Tout
>
map
(
typename
Map
<
T
,
Tout
>::
MapFunc
func
)
noexcept
(
false
)
{
if
(
partitioningState
==
NoPartitioning
)
{
auto
op
=
std
::
make_shared
<
Map
<
T
,
Tout
>>
(
func
);
auto
iter
=
addPublisher
<
Map
<
T
,
Tout
>
,
DataSource
<
T
>>
(
op
);
...
...
@@ -838,7 +835,7 @@ class Pipe {
template
<
typename
Tout
>
Pipe
<
Tout
>
tuplify
(
const
std
::
initializer_list
<
std
::
string
>&
predList
,
TuplifierParams
::
TuplifyMode
m
,
unsigned
int
ws
=
0
)
throw
(
TopologyException
)
{
unsigned
int
ws
=
0
)
noexcept
(
false
)
{
if
(
partitioningState
==
NoPartitioning
)
{
auto
op
=
std
::
make_shared
<
Tuplifier
<
T
,
Tout
>>
(
predList
,
m
,
ws
);
auto
iter
=
addPublisher
<
Tuplifier
<
T
,
Tout
>
,
DataSource
<
T
>>
(
op
);
...
...
@@ -878,7 +875,7 @@ class Pipe {
*/
template
<
typename
Tout
,
typename
State
>
Pipe
<
Tout
>
statefulMap
(
typename
StatefulMap
<
T
,
Tout
,
State
>::
MapFunc
func
)
throw
(
TopologyException
)
{
func
)
noexcept
(
false
)
{
if
(
partitioningState
==
NoPartitioning
)
{
auto
op
=
std
::
make_shared
<
StatefulMap
<
T
,
Tout
,
State
>>
(
func
);
auto
iter
=
addPublisher
<
StatefulMap
<
T
,
Tout
,
State
>
,
DataSource
<
T
>>
(
op
);
...
...
@@ -931,7 +928,7 @@ class Pipe {
template
<
typename
AggrState
>
Pipe
<
typename
AggrState
::
ResultTypePtr
>
aggregate
(
AggregationTriggerType
tType
=
TriggerAll
,
const
unsigned
int
tInterval
=
0
)
throw
(
TopologyException
)
{
const
unsigned
int
tInterval
=
0
)
noexcept
(
false
)
{
static_assert
(
typename
AggrStateTraits
<
AggrState
>::
type
(),
"aggregate requires an AggrState class"
);
return
aggregate
<
typename
AggrState
::
ResultTypePtr
,
AggrState
>
(
AggrState
::
finalize
,
AggrState
::
iterate
,
tType
,
tInterval
);
...
...
@@ -985,7 +982,7 @@ class Pipe {
typename
Aggregation
<
T
,
Tout
,
AggrState
>::
FinalFunc
finalFun
,
typename
Aggregation
<
T
,
Tout
,
AggrState
>::
IterateFunc
iterFun
,
AggregationTriggerType
tType
=
TriggerAll
,
const
unsigned
int
tInterval
=
0
)
throw
(
TopologyException
)
{
const
unsigned
int
tInterval
=
0
)
noexcept
(
false
)
{
if
(
partitioningState
==
NoPartitioning
)
{
auto
op
=
std
::
make_shared
<
Aggregation
<
T
,
Tout
,
AggrState
>>
(
finalFun
,
iterFun
,
tType
,
tInterval
);
...
...
@@ -1033,7 +1030,7 @@ class Pipe {
typename
KeyType
=
DefaultKeyType
>
Pipe
<
typename
AggrState
::
ResultTypePtr
>
groupBy
(
AggregationTriggerType
tType
=
TriggerAll
,
const
unsigned
int
tInterval
=
0
)
throw
(
TopologyException
)
{
const
unsigned
int
tInterval
=
0
)
noexcept
(
false
)
{
static_assert
(
typename
AggrStateTraits
<
AggrState
>::
type
(),
"groupBy requires an AggrState class"
);
return
groupBy
<
typename
AggrState
::
ResultTypePtr
,
AggrState
,
KeyType
>
(
AggrState
::
finalize
,
AggrState
::
iterate
,
tType
,
tInterval
);
...
...
@@ -1080,7 +1077,7 @@ class Pipe {
typename
GroupedAggregation
<
T
,
Tout
,
AggrState
,
KeyType
>::
IterateFunc
iterFun
,
AggregationTriggerType
tType
=
TriggerAll
,
const
unsigned
int
tInterval
=
0
)
throw
(
TopologyException
)
{
const
unsigned
int
tInterval
=
0
)
noexcept
(
false
)
{
try
{
typedef
std
::
function
<
KeyType
(
const
T
&
)
>
KeyExtractorFunc
;
KeyExtractorFunc
keyFunc
=
...
...
@@ -1139,7 +1136,7 @@ class Pipe {
template
<
typename
Tout
,
typename
RelatedValueType
>
Pipe
<
Tout
>
matchByNFA
(
typename
NFAController
<
T
,
Tout
,
RelatedValueType
>::
NFAControllerPtr
nfa
)
throw
(
TopologyException
)
{
nfa
)
noexcept
(
false
)
{
auto
op
=
std
::
make_shared
<
Matcher
<
T
,
Tout
,
RelatedValueType
>>
(
Matcher
<
T
,
Tout
,
RelatedValueType
>::
FirstMatch
);
op
->
setNFAController
(
nfa
);
...
...
@@ -1170,8 +1167,7 @@ class Pipe {
* @return a reference to the pipe
*/
template
<
typename
Tout
,
typename
RelatedValueType
>
Pipe
<
Tout
>
matcher
(
CEPState
<
T
,
RelatedValueType
>&
expr
)
throw
(
TopologyException
)
{
Pipe
<
Tout
>
matcher
(
CEPState
<
T
,
RelatedValueType
>&
expr
)
noexcept
(
false
)
{
assert
(
partitioningState
==
NoPartitioning
);
auto
op
=
std
::
make_shared
<
Matcher
<
T
,
Tout
,
RelatedValueType
>>
(
Matcher
<
T
,
Tout
,
RelatedValueType
>::
FirstMatch
);
...
...
@@ -1210,7 +1206,7 @@ class Pipe {
template
<
typename
KeyType
=
DefaultKeyType
,
typename
T2
>
Pipe
<
typename
SHJoin
<
T
,
T2
,
KeyType
>::
ResultElement
>
join
(
Pipe
<
T2
>&
otherPipe
,
typename
SHJoin
<
T
,
T2
,
KeyType
>::
JoinPredicateFunc
pred
)
throw
(
TopologyException
)
{
pred
)
noexcept
(
false
)
{
typedef
typename
SHJoin
<
T
,
T2
,
KeyType
>::
ResultElement
Tout
;
try
{
typedef
std
::
function
<
KeyType
(
const
T
&
)
>
LKeyExtractorFunc
;
...
...
@@ -1281,7 +1277,7 @@ class Pipe {
*/
template
<
typename
KeyType
=
DefaultKeyType
>
Pipe
<
T
>
toTable
(
std
::
shared_ptr
<
Table
<
typename
T
::
element_type
,
KeyType
>>
tbl
,
bool
autoCommit
=
true
)
throw
(
TopologyException
)
{
bool
autoCommit
=
true
)
noexcept
(
false
)
{
typedef
std
::
function
<
KeyType
(
const
T
&
)
>
KeyExtractorFunc
;
assert
(
partitioningState
==
NoPartitioning
);
...
...
@@ -1331,7 +1327,7 @@ class Pipe {
std
::
shared_ptr
<
Table
<
typename
RecordType
::
element_type
,
KeyType
>>
tbl
,
std
::
function
<
bool
(
const
T
&
,
bool
,
const
typename
RecordType
::
element_type
&
)
>
updateFunc
)
throw
(
TopologyException
)
{
updateFunc
)
noexcept
(
false
)
{
typedef
std
::
function
<
KeyType
(
const
T
&
)
>
KeyExtractorFunc
;
assert
(
partitioningState
==
NoPartitioning
);
...
...
@@ -1373,7 +1369,7 @@ class Pipe {
* @return a new pipe
*/
Pipe
<
T
>
partitionBy
(
typename
PartitionBy
<
T
>::
PartitionFunc
pFun
,
unsigned
int
nPartitions
)
throw
(
TopologyException
)
{
unsigned
int
nPartitions
)
noexcept
(
false
)
{
if
(
partitioningState
!=
NoPartitioning
)
throw
TopologyException
(
"Cannot partition an already partitioned stream."
);
...
...
@@ -1394,7 +1390,7 @@ class Pipe {
* the data stream element type consumed by PartitionBy
* @return a new pipe
*/
Pipe
<
T
>
merge
()
throw
(
TopologyException
)
{
Pipe
<
T
>
merge
()
noexcept
(
false
)
{
if
(
partitioningState
!=
NextInPartitioning
)
throw
TopologyException
(
"Nothing to merge in topology."
);
...
...
@@ -1433,7 +1429,7 @@ class Pipe {
*/
Pipe
<
T
>
barrier
(
std
::
condition_variable
&
cVar
,
std
::
mutex
&
mtx
,
typename
Barrier
<
T
>::
PredicateFunc
f
)
throw
(
TopologyException
)
{
typename
Barrier
<
T
>::
PredicateFunc
f
)
noexcept
(
false
)
{
if
(
partitioningState
==
NoPartitioning
)
{
auto
op
=
std
::
make_shared
<
Barrier
<
T
>>
(
cVar
,
mtx
,
f
);
auto
iter
=
addPublisher
<
Barrier
<
T
>
,
DataSource
<
T
>>
(
op
);
...
...
src/dsl/Topology.hpp
View file @
36d53940
...
...
@@ -242,7 +242,7 @@ namespace pfabric {
* a new pipe where the stream acts as the producer.
*/
template
<
typename
T
>
Pipe
<
T
>
fromStream
(
Dataflow
::
BaseOpPtr
stream
)
throw
(
TopologyException
)
{
Pipe
<
T
>
fromStream
(
Dataflow
::
BaseOpPtr
stream
)
noexcept
(
false
)
{
// check whether stream is a Queue<T> operator
auto
pOp
=
dynamic_cast
<
Queue
<
T
>*>
(
stream
.
get
());
if
(
pOp
==
nullptr
)
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment