Matcher.hpp 12.3 KB
Newer Older
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
1
/*
2
 * Copyright (C) 2014-2021 DBIS Group - TU Ilmenau, All Rights Reserved.
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
3
4
5
 *
 * This file is part of the PipeFabric package.
 *
6
7
8
 * PipeFabric is free software: you can redistribute it and/or modify it under the terms of the GNU
 * General Public License as published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
9
 *
10
11
12
 * PipeFabric is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
13
 *
14
15
 * You should have received a copy of the GNU General Public License along with PipeFabric. If not,
 * see <http://www.gnu.org/licenses/>.
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
16
17
18
19
 */

#ifndef  Matcher_hpp_
#define  Matcher_hpp_
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
20

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
21
22
23
24
25
26
27
#include "CEPEngine.hpp"
#include "engine/FirstMatchEngine.hpp"
#include "engine/NextMatchEngine.hpp"

#include "qop/UnaryTransform.hpp"
#include "MatchProducer.hpp"
#include "util/Partition.hpp"
28
#include "edge/NFAEdge.hpp"
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
29
#include "state/NFAState.hpp"
30

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
31
#include "dsl/CEPState.hpp"
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

/**
 * @brief The matcher operator for detecting complex events.
 *
 * This operator implements the complex event processing of tuple streams. This class aims to run a particular engine according to the selected strategy
 * to process the incoming tuples incrementally. Then, it produces the matches (the results) according to selected output strategy.
 * The matcher operator is implemented by Nondeterministic Finite Automata (NFA) to evaluate event patterns over tuple streams in a way similar to regular expressions
 */
namespace pfabric {
template<typename InputStreamElement, typename OutputStreamElement,
		typename EventDependency>
class Matcher: public UnaryTransform<InputStreamElement, OutputStreamElement> // use default unary transform
{

	typedef UnaryTransform<InputStreamElement, OutputStreamElement> TransformBase;
	typedef typename TransformBase::InputDataChannel InputDataChannel;
	typedef typename TransformBase::InputPunctuationChannel InputPunctuationChannel;
	typedef typename TransformBase::InputDataElementTraits InputDataElementTraits;

public:
	/**
	 * The available selection strategy, the tuples matcher engine would be run accordingly
	 * see the engine folder
	 */
	enum SelectionStrategy {
		NextMatches, AllMatches, ContiyuityMatches, FirstMatch, RecentMatch
	};
	/**
	 * The available output strategy which the output would be generated accordingly,
	 * because the output of this operator is a complex event or a combination of tuples
	 * one by one means generate the tuples one after another, in this case the resulting tuples
	 * have fixed schema, whereas combined strategy combines all tuples (complex event) in one big tuple
	 * which has variable schema
	 *
	 */
	enum OutputStrategy {
		OneByOne, Combined
	};

71
72
	typedef std::map<std::string, typename NFAEdge<InputStreamElement, OutputStreamElement,
	 			EventDependency>::EdgePredicate> PredicateMap;
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
private:
	/**
	 * The current working engine
	 */
	CEPEngine<InputStreamElement, OutputStreamElement, EventDependency>* engine;
	/**
	 * The strategy to process the incoming events
	 */
	Matcher::SelectionStrategy strategy;
	/**
	 * The output strategy to specify the form of output (one by one or combined)
	 */
	Matcher::OutputStrategy outStrategy;
	/**
	 * An object to create the matched events
	 */
	typename MatchProducer<InputStreamElement, OutputStreamElement,
			EventDependency>::MatchProducerPtr matcher;
	/**
	 * Create CEP engine according to an assigned strategy
	 */
	void createEngine();

	/**
	 * @brief Bind the callback for the data channel.
	 */
	BIND_INPUT_CHANNEL_DEFAULT( InputDataChannel, Matcher, processDataElement );

	/**
	 * @brief Bind the callback for the punctuation channel.
	 */
	BIND_INPUT_CHANNEL_DEFAULT( InputPunctuationChannel, Matcher, processPunctuation );
105

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
public:

	/**
	 * A constructor to specify the selected strategy to run the matcher accordingly and
	 * to specify the way of generating the resulting tuples.
	 *
	 * @param selectStr the selected strategy (default Matcher::FirstMatch)
	 * @param outStr output strategy (default Matcher::OneBYOne)
	 */
	Matcher(Matcher::SelectionStrategy selectStr = Matcher::FirstMatch,
			Matcher::OutputStrategy outStr = Matcher::OneByOne) {
		setSelectionStrategy(selectStr);
		setOutputStrategy(outStr);
		createEngine();
		matcher = typename MatchProducer<InputStreamElement, OutputStreamElement, EventDependency>::MatchProducerPtr(new MatchProducer<InputStreamElement, OutputStreamElement, EventDependency>());
		assert(matcher);
	}
	/**
	 * A destructor to release the resources and clean-up
	 */
	virtual ~Matcher() {delete engine;}
127

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
128
	void constructNFA(CEPState<InputStreamElement, EventDependency>&  expr);
129
130


Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
	/**
	 * Get the current running engine according to the selected strategy
	 * @return the current running engine
	 */
	CEPEngine<InputStreamElement, OutputStreamElement, EventDependency>* getEngine() const {return this->engine;}
	/**
	 * get the selection strategy whether NextMatches, AllMatches, ContiyuityMatches, FirstMatch or RecentMatch
	 * @return the selection strategy
	 */
	Matcher::SelectionStrategy getSelectionStrategy() const;
	/**
	 * get the selection strategy whether NextMatches, AllMatches, ContiyuityMatches, FirstMatch or RecentMatch as string
	 * @return the selection strategy as string
	 */
	std::string getSelectionStrategyString() const {return strategy;}
	/**
	 * set the selection strategy
	 * @param the selection strategy
	 *
	 */
	void setSelectionStrategy(Matcher::SelectionStrategy strategy) {
		if (strategy >= Matcher::NextMatches && strategy <= Matcher::RecentMatch)
		this->strategy = strategy;
		else
		this->strategy = Matcher::FirstMatch;
	}

	/**
	 * @brief This method is invoked when a punctuation arrives.
	 *
	 * It simply forwards the punctuation to the subscribers.
	 *
	 * @param[in] punctuation
	 *    the incoming punctuation tuple
	 */
	void processPunctuation( const PunctuationPtr& punctuation ) {
		this->getOutputPunctuationChannel().publish(punctuation);
	}

	/**
	 * This method is invoked when a data stream element arrives.
	 *
	 * It applies the projection function and forwards the projected element to its subscribers.
	 *
	 * @param[in] data
	 *    the incoming stream element
	 * @param[in] outdated
	 *    flag indicating whether the tuple is new or invalidated now
	 */
	void processDataElement( const InputStreamElement& data, const bool outdated ) {
		engine->runEngine(data);
	}
	/**
	 * Print information about this operator
	 * @param os the output stream object
	 */
	void printInfo(std::ostream& os) const {
		//TODO
	}
	/**
	 * Set the output strategy either one by one or combined
	 * @param the output strategy
	 */
	void setOutputStrategy(Matcher::OutputStrategy str) {
		if (str >= Matcher::OneByOne && str <= Matcher::Combined)
		this->outStrategy = str;
		else
		this->outStrategy = Matcher::OneByOne;
	}
	/**
	 * Get the output strategy
	 * @return the output strategy
	 */
	Matcher::OutputStrategy getOutputStrategy() const {return outStrategy;}
	/**
	 *  Since our systems depends on NFA concept. This methods return a pointer to NFA builder or controller
	 *  @return return our main NFA
	 */
Omran Saleh's avatar
Omran Saleh committed
209
	void setNFAController(typename NFAController<InputStreamElement, OutputStreamElement, EventDependency>::NFAControllerPtr nfa)  { engine->setNFA(nfa);}
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
	const typename NFAController<InputStreamElement, OutputStreamElement, EventDependency>::NFAControllerPtr getNFAController() const {return engine->getNFA();}
	/**
	 * set the window constraint parameters by the CEP engine which implemented by within cluase
	 * @param period the time needed to detect the complex event within
	 * @param fromEvent
	 * @param tovent
	 */
	void setWindowConstraint(long period, int fromEvent = -1, int toEvent =
			-1) {
		assert(fromEvent >= toEvent);
		engine->setWindowConstraint(period, fromEvent, toEvent);
	}
	/**
	 * Get the window information from the matcher engine
	 */
	typename CEPEngine<InputStreamElement, OutputStreamElement, EventDependency>::WindowStruct* getWindow() const {return engine->getWindow();}

	/**
	 * Set the partition object itself
	 */
	void setEqulity(Partition<InputStreamElement>* par) {
		if (engine)
		engine->setEquality(par);
	}
	/**
	 * Publish the new matches or the complex event (combination of tuples or one by one) to a next operator
	 * Once the engine detects the complex event, it publishes the result to the next operator in the chain
	 * The matches are exist in structurePtr object which responsible for storing the matches
	 * @param matches the matches object
	 */
240
	void publishResultMatches(const typename NFAStructure<InputStreamElement, OutputStreamElement, EventDependency>::NFAStructurePtr& matches);
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
};
}

namespace pfabric {
template<class InputStreamElement, class OutputStreamElement,
	class EventDependency>
void Matcher<InputStreamElement, OutputStreamElement, EventDependency>::createEngine() {
if (strategy == Matcher::FirstMatch)
	engine = new FirstMatchEngine<InputStreamElement, OutputStreamElement,
			EventDependency>(this);
else if (strategy == Matcher::NextMatches) {
	engine = new NextMatchEngine<InputStreamElement, OutputStreamElement,
			EventDependency>(this);
}
/*else if (strategy == Matcher::RecentMatch) {
 engine = new RecentMatchEngine(this);
 } else if (strategy == Matcher::AllMatches) {
 engine = new AllMatchesEngine(this);
 } else if (strategy == Matcher::ContiyuityMatches) {
 engine = new ContiguityMatchesEngine(this);
 }*/
}

template<class InputStreamElement, class OutputStreamElement,
	class EventDependency>
266
void Matcher<InputStreamElement, OutputStreamElement, EventDependency>::publishResultMatches(
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
267
268
269
270
271
272
273
274
275
276
277
278
279
	const typename NFAStructure<InputStreamElement, OutputStreamElement,
			EventDependency>::NFAStructurePtr& matches) {

if (outStrategy == Matcher::Combined) {
}
//this->template getOutputChannel<0>().publish((matcher->produceTogether(matches), false));
else {
	typename MatchProducer<InputStreamElement, OutputStreamElement,
			EventDependency>::matchesList list = matches->getEvents();
	for (typename MatchProducer<InputStreamElement, OutputStreamElement,
			EventDependency>::matchConstIterator i = list.begin();
			i != list.end(); i++) {
		//std::cout << (*i)->convertInstanceToTuple() << std::endl;
280
		this->getOutputDataChannel().publish( (*i)->convertInstanceToTuple(), false);
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
281
282
283
284
	}
}
}

285
286
287
template<class InputStreamElement, class OutputStreamElement, class EventDependency>
void Matcher<InputStreamElement,
	OutputStreamElement,
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
288
289
290
291
292
293
	EventDependency>::constructNFA(CEPState<InputStreamElement, EventDependency>& expr) {
		typedef CEPState<InputStreamElement, EventDependency> MyCEPState;
		typedef typename CEPState<InputStreamElement, EventDependency>::CEPStatePtr CEPStatePtr;
		typedef typename NFAState<InputStreamElement>::StatePtr NFAStatePtr;
		typedef typename NFAEdge<InputStreamElement, OutputStreamElement,
			EventDependency>::NFAEdgePtr NFAEdgePtr;
294

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
295
296
297
298
299
300
301
		struct StateInfo {
			StateInfo() {}
			StateInfo(CEPStatePtr ptr) : cptr(ptr), nptr(nullptr), eptr(nullptr) {}
			CEPStatePtr cptr;
			NFAStatePtr nptr;
			NFAEdgePtr eptr;
		};
302

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
303
		std::map<int, StateInfo> states;
304

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
305
306
		auto nfa = getNFAController();
		auto exprTable = expr.exprTable();
307

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
308
309
310
311
312
313
314
315
		// extract the set of states as well as start and end
		for (auto& ex : exprTable) {
			states[ex.fromState->ID()] = StateInfo(ex.fromState);
			if (ex.toState != nullptr) states[ex.toState->ID()] = StateInfo(ex.toState);
		}
		// foreach state: nfa->createNormalState();
		for (auto& p : states) {
			auto id = std::to_string(p.first);
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
316
			if (p.first == 0) {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
317
				p.second.nptr = nfa->createStartState(id);
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
318
319
			}
			else if (p.first == 1000) {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
320
				p.second.nptr = nfa->createFinalState(id);
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
321
322
        p.second.eptr = nfa->createForwardEdge(p.second.cptr->predicate());
			}
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
323
324
325
326
327
328
329
			else {
				// TODO: handle negateState
				p.second.nptr = nfa->createNormalState(id);
				// foreach state: nfa->createForwardEdge();
				p.second.eptr = nfa->createForwardEdge(p.second.cptr->predicate());
			}
		}
330

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
331
332
333
334
335
336
337
338
339
340
341
		for (auto& ex : exprTable) {
			// foreach expr: nfa->createForwardTransition();
			if (ex.op == MyCEPState::SEQ) {
				auto beginState = states[ex.fromState->ID()].nptr;
				auto endState = states[ex.toState->ID()].nptr;
        auto predicate = states[ex.toState->ID()].eptr;
				nfa->createForwardTransition(beginState, predicate, endState);
			}
			else if (ex.op == MyCEPState::OR) {
				// TODO handle OR
			}
342
		}
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
343
		nfa->print();
344
345
	}

Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
346
347
}
#endif /*  Matcher_hpp_ */