CEPEngine.hpp 11.7 KB
Newer Older
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
1
/*
2
 * Copyright (C) 2014-2021 DBIS Group - TU Ilmenau, All Rights Reserved.
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
3
4
5
 *
 * This file is part of the PipeFabric package.
 *
6
7
8
 * PipeFabric is free software: you can redistribute it and/or modify it under the terms of the GNU
 * General Public License as published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
9
 *
10
11
12
 * PipeFabric is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
13
 *
14
15
 * You should have received a copy of the GNU General Public License along with PipeFabric. If not,
 * see <http://www.gnu.org/licenses/>.
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
 */

#ifndef CEPEngine_hpp_
#define CEPEngine_hpp_
//#include "EventBuffer.hpp"
#include "Instance.hpp"
//#include "Matcher.hpp"
#include <list>
#include <map>
#include "StructurePool.hpp"
#include "boost/date_time/posix_time/posix_time.hpp"
#include "NFAController.hpp"
#include "util/ValueIDMultimap.hpp"
/**
 * The main engine to process CEP
 */
namespace pfabric {
33
template<class TinPtr, class ToutPtr, class TdepPtr>
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
34
35
class Matcher;
//class GCStructures;
36
template<class TinPtr, class ToutPtr, class TdepPtr>
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
37
38
39
40
41
class CEPEngine {
protected:
	/**
	 * CEP manager to notify the system with the matched events
	 */
42
	Matcher<TinPtr, ToutPtr, TdepPtr> *manager;
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
43
44
45
46
47
48
49
	/**
	 * statistical counter to store  number of matches
	 */
	long counter;
	/**
	 * a pool of structures
	 */
50
	StructurePool<TinPtr, ToutPtr, TdepPtr>* pool;
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
51
52
53
	/**
	 * to be deleted structures, these can be outputted structures or violated time constraint structures
	 */
54
	std::list<typename NFAStructure<TinPtr, ToutPtr, TdepPtr>::NFAStructurePtr> deletedStructures;
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
55
56
57
	/**
	 * the global NFA
	 */
58
	typename NFAController<TinPtr, ToutPtr, TdepPtr>::NFAControllerPtr  nfa;
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
	/**
	 * create new structure, the event in the parameter will be the first matched event
	 * @param event an event to start the structure (sequence) with it
	 */
	void createStartStructure(const TinPtr& event);

	/**
	 * the partition structure which the structures can be stored in the pool accordingly
	 */
	Partition<TinPtr>* equalityPar;
	/**
	 * sequence collector to remove unnecessary sequences
	 */
	//GCStructures* gc;

	/**
	 * an indicator to notify the sequence collector to remove garbage sequences
	 */

	boost::atomic<bool> cgIndicator;

	/**
	 * check the time window constraint (within) for the current event and a structure
	 * @param event
	 * @param str
	 * @return
	 */
86
	bool checkWindowTime(const TinPtr& event, const typename NFAStructure<TinPtr, ToutPtr, TdepPtr>::NFAStructurePtr& str);
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
87
88
89
90
91
92
	/**
	 * check edge predicates
	 * @param event
	 * @param str
	 * @return
	 */
93
94
	int checkPredicate(const TinPtr& event, const typename NFAStructure<TinPtr, ToutPtr, TdepPtr>::NFAStructurePtr& str,
			typename NFAState<TinPtr>::StateType &type);
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
95
96
97
98
99
public:
	/**
	 * constructor to receive the CEP manager
	 * @param manager
	 */
100
	CEPEngine(Matcher<TinPtr, ToutPtr, TdepPtr> *manager): counter(0), cgIndicator(false) {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
101
		this->manager = manager;
102
103
		this->nfa = typename NFAController<TinPtr, ToutPtr, TdepPtr>::NFAControllerPtr (new NFAController<TinPtr, ToutPtr, TdepPtr>());
		this->pool = new StructurePool<TinPtr, ToutPtr, TdepPtr>();
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
		this->equalityPar = new SequencePartition<TinPtr>;
		windowConst = new WindowStruct;
		windowConst->window = WindowStruct::NoConstraint;
	}
	/**
	 * destructor
	 */
	virtual ~CEPEngine();
	/**
	 * print number of matches
	 * @param os the output stream object
	 */
	virtual void printNumMatches(std::ostream& os) {
		os << this->pool->getNumInsertions() << " " << this->pool->size();
	}

public:

	struct WindowStruct {
		enum WindowContstant {
			FirstLastEvents, FromLastEvents, FromToEvents, NoConstraint
		};
		WindowContstant window;
		int eventFrom;
		int eventTo;
		long period;
	};
	/**
	 * virtual function to implement different processing approaches such as next matches, all matches ....
	 * @param event the current event
	 * @param str a structure to investigate the event with respect to it
	 */
	virtual void runEngine(const TinPtr& event)=0;
	/**
	 * get the structures pool
	 * @return structures pool
	 */
141
	StructurePool<TinPtr, ToutPtr, TdepPtr>* getStructurePool() const {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
142
143
144
145
146
147
		return this->pool;
	}
	/**
	 * set the structures pool
	 * @param pool the structures pool
	 */
148
	void setStructurePool(StructurePool<TinPtr, ToutPtr, TdepPtr>* pool) {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
149
150
151
152
153
154
		this->pool = pool ;
	}
	/**
	 * get the structures to be deleted
	 * @return the structures to be deleted
	 */
155
	std::list<typename NFAStructure<TinPtr, ToutPtr, TdepPtr>::NFAStructurePtr> getDeletedStructures() const {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
156
157
158
159
160
161
		return this->deletedStructures;
	}
	/**
	 *  set the structures to be deleted
	 * @param deletedStructures the structures to be deleted
	 */
162
	void setDeletedStructures(std::list<typename NFAStructure<TinPtr, ToutPtr, TdepPtr>::NFAStructurePtr> deletedStructures) {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
163
164
165
166
167
168
		this->deletedStructures = deletedStructures;
	}
	/**
	 * get our working NFA
	 * @return  our working NFA
	 */
169
	const typename NFAController<TinPtr, ToutPtr, TdepPtr>::NFAControllerPtr  getNFA() const {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
170
171
172
173
174
175
		return this->nfa;
	}
	/**
	 * set our working NFA
	 * @param nfa our working NFA
	 */
Omran Saleh's avatar
Omran Saleh committed
176
	void setNFA(typename NFAController<TinPtr, ToutPtr, TdepPtr>::NFAControllerPtr nfa) { this->nfa = nfa; }
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
	/**
	 * clean unwanted structures from current run
	 */
	void runGCstructures();
	/**
	 * return the number of matches
	 */
	long getNumMatches() const {
		return this->counter;
	}
	/**
	 * set the partition object itself
	 */
	void setEquality(Partition<TinPtr>* equality) {
		if (equalityPar)
			delete equalityPar;
		equalityPar = equality;
	}
	/**
	 * set the window constraint parameters by the cep engine
	 */
	void setWindowConstraint(long period, int fromEvent = -1, int toEvent = -1);
	/**
	 * get the window from the cep engine
	 */
	WindowStruct* getWindow() const { return this->windowConst; }

	/**
	 * return if the CEP has window constraint
	 */
	bool hasWindow() { return !(windowConst->window == WindowStruct::NoConstraint); }
protected:

	/**
	 * specify the window (within) constraint
	 */
	WindowStruct* windowConst;
};
}

namespace pfabric {


220
221
template<class TinPtr, class ToutPtr, class Tdep>
CEPEngine<TinPtr, ToutPtr, Tdep>::~CEPEngine() {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
222
223
224
225
226
	delete this->pool;
	delete this->equalityPar;
	delete this->windowConst;
}

227
228
229
230
template<class TinPtr, class ToutPtr, class TdepPtr>
bool CEPEngine<TinPtr, ToutPtr, TdepPtr>::checkWindowTime(const TinPtr& event,
		const typename NFAStructure<TinPtr, ToutPtr, TdepPtr>::NFAStructurePtr& str) {
/*
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
231
	if (windowConst->window == WindowStruct::FirstLastEvents) {
232
		if (event->timestamp() - str->getFirstEventTimestamp()
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
233
234
235
236
				<= windowConst->period) {
			return true;
		}
	} else if (windowConst->window == WindowStruct::FromLastEvents) {
237
		if (event->timestamp()
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
238
239
240
241
242
243
244
245
246
				- str->getEventTimestamp(windowConst->eventFrom)
				<= windowConst->period)
			return true;
	} else if (windowConst->window == WindowStruct::FromToEvents) {
		if (str->getEventTimestamp(windowConst->eventTo)
				- str->getEventTimestamp(windowConst->eventFrom)
				<= windowConst->period)
			return true;
	}
247
248
*/
	return true;
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
249
}
250
251
252
template<class TinPtr, class ToutPtr, class TdepPtr>
int CEPEngine<TinPtr, ToutPtr, TdepPtr>::checkPredicate(const TinPtr& event,
		const typename NFAStructure<TinPtr, ToutPtr, TdepPtr>::NFAStructurePtr& str, typename NFAState<TinPtr>::StateType &type) {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
253

254
	if (str->getCurrentState()->getStateType() == NFAState<TinPtr>::Normal) {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
255
		//creat_new_structure(event, current_state);
256
		auto current = (NormalState<TinPtr, ToutPtr, TdepPtr>*)str->getCurrentState();
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
257
258
259
260
261
262
		for (int i = 0; i < current->getNumEdges(); i++) {
			if (current->getForwardEdgeByIndex(i)->evaluate(event, str)) {
				return i;
			}
		}
	} else if (str->getCurrentState()->getStateType()
263
			== NFAState<TinPtr>::Kleene) {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
264
265

		//try go to the next
266
		auto current = (KleeneState<TinPtr, ToutPtr, TdepPtr>*)(
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
267
268
269
				str->getCurrentState());
		bool forwardOK = false;
		switch (current->getSpecification()) {
270
271
		case KleeneState<TinPtr, ToutPtr, TdepPtr>::Star:
		case KleeneState<TinPtr, ToutPtr, TdepPtr>::Question: {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
272
273
274
			forwardOK = true;
			break;
		}
275
		case KleeneState<TinPtr, ToutPtr, TdepPtr>::Plus: {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
276
277
278
279
280
			if (str->getCurrentKleene(current) == 1) {
				forwardOK = true;
			}
			break;
		}
281
		case KleeneState<TinPtr, ToutPtr, TdepPtr>::Restricted:
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
			if (str->getCurrentKleene(current)
					>= current->getLoopEdge()->getNumOfLoop()) {
				forwardOK = true;
			}
			break;
		}
		if (forwardOK) {
			for (int i = 0; i < current->getNumEdges(); i++) {
				if (current->getForwardEdgeByIndex(i)->evaluate(event,
						str)) {
					return i;
				}
			}
		}
		auto loop = current->getLoopEdge();
		if (loop->evaluate(event, str)) {
			switch (current->getSpecification()) {
299
300
			case KleeneState<TinPtr, ToutPtr, TdepPtr>::Star:
			case KleeneState<TinPtr, ToutPtr, TdepPtr>::Question: {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
301
302
303
				str->addEvent(event, loop);
				break;
			}
304
			case KleeneState<TinPtr, ToutPtr, TdepPtr>::Plus: {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
305
306
307
308
309
				if (str->getCurrentKleene(current) == 0) {
					str->addEvent(event, loop);
				}
				break;
			}
310
			case KleeneState<TinPtr, ToutPtr, TdepPtr>::Restricted: {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
311
312
313
314
315
316
317
318
319
				if (str->getCurrentKleene(current)
						< current->getLoopEdge()->getNumOfLoop()) {
					str->addEvent(event, loop);
				}
				break;
			}
			}
		}
	} else if (str->getCurrentState()->getStateType()
320
321
			== NFAState<TinPtr>::Negation) {
		auto current = (NormalState<TinPtr, ToutPtr, TdepPtr>*)str->getCurrentState();
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
322
323
324
325
		do {
			for (int i = 0; i < current->getNumEdges(); i++) {
				if (current->getForwardEdgeByIndex(i)->evaluate(event,
						str)) {
326
					type = NFAState<TinPtr>::Negation;
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
327
328
329
					return -1;
				}
			}
330
			current = (NormalState<TinPtr, ToutPtr, TdepPtr>*)(
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
331
					current->getForwardEdgeByIndex(0)->getDestState());
332
333
		} while (current->getStateType() == NFAState<TinPtr>::Negation);
		if (current->getStateType() == NFAState<TinPtr>::Final)
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
334
335
336
337
338
339
340
341
342
343
344
			return 0;
		for (int i = 0; i < current->getNumEdges(); i++) {
			if (current->getForwardEdgeByIndex(i)->evaluate(event, str)) {
				str->setCurrentState(current);
				return i;
			}
		}
	}
	return -1;
}

345
346
template<class TinPtr, class ToutPtr, class Tdep>
void CEPEngine<TinPtr, ToutPtr, Tdep>:: runGCstructures() {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
347
	for (std::size_t i = 0; i < this->deletedStructures.size(); i++) {
348
		const typename NFAStructure<TinPtr, ToutPtr, Tdep>::NFAStructurePtr& str = this->deletedStructures.front();
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
349
		auto par = str->getEqualityValue();
350
		typename ValueIDMultimap<typename NFAStructure<TinPtr, ToutPtr, Tdep>::NFAStructurePtr, TinPtr>::MultimapPair iterPair =
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
351
352
				this->pool->getValue(par);

353
		typename ValueIDMultimap<typename NFAStructure<TinPtr, ToutPtr, Tdep>::NFAStructurePtr, TinPtr>::MultimapConstIterator it = iterPair.first;
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
354
355
356
357
358
359
360
361
362
		for (; it != iterPair.second; ++it) {
			if (it->second == str) {
				pool->removeValue(it);
				break;
			}
		}
		this->deletedStructures.pop_front();
	}
}
363
364
template<class TinPtr, class ToutPtr, class Tdep>
void CEPEngine<TinPtr, ToutPtr, Tdep>::createStartStructure(const TinPtr& event) {
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
365
366
367
368
369
370
371
372
373
374
375
376
377
378
	auto start = this->nfa->getStartState();
	for (int i = 0; i < start->getNumEdges(); i++) {
		if (start->getForwardEdgeByIndex(i)->evaluate(event, NULL)) {
			equalityPar->generateValues(event);
			auto newStructure = this->pool->getStructure(
					this->nfa, equalityPar->clone());
			newStructure->addEvent(event,
					start->getForwardEdgeByIndex(i));
			break;
		}
	}
}


379
380
template<class TinPtr, class ToutPtr, class Tdep>
void CEPEngine<TinPtr, ToutPtr, Tdep>::setWindowConstraint(long period, int fromEvent,
Kai-Uwe Sattler's avatar
Kai-Uwe Sattler committed
381
382
383
384
385
386
387
388
389
390
391
392
393
394
		int toEvent) {
	windowConst->eventFrom = fromEvent;
	windowConst->eventTo = toEvent;
	windowConst->period = period;
	if (toEvent == -1 && fromEvent == -1)
		windowConst->window = WindowStruct::FirstLastEvents;
	else if (toEvent == -1)
		windowConst->window = WindowStruct::FromLastEvents;
	else
		windowConst->window = WindowStruct::FromToEvents;
}
}

#endif /* CEPEngine_hpp_ */